Reorganized runtime into theories

Created several theory classes (containers for rules + data)
and moved functionality previously part of Runtime class
into one of those theories.

All tests pass

Issue: #
Change-Id: I3643f6c35b6e355a20d74e74f3be9afab14ba5cd
This commit is contained in:
Tim Hinrichs 2013-10-03 09:04:08 -07:00
parent 3259680709
commit bcc8dc0417
2 changed files with 461 additions and 211 deletions

View File

@ -19,76 +19,6 @@ class Tracer(object):
class CongressRuntime (Exception):
## Delta Rules
class DeltaRule(object):
def __init__(self, trigger, head, body, original):
self.trigger = trigger # atom
self.head = head # atom
self.body = body # list of atoms with is_negated()
self.original = original # Rule from which derived
def __str__(self):
return "<trigger: {}, head: {}, body: {}>".format(
str(self.trigger), str(self.head), [str(lit) for lit in self.body])
def __eq__(self, other):
return (self.trigger == other.trigger and
self.head == other.head and
len(self.body) == len(other.body) and
all(self.body[i] == other.body[i]
for i in xrange(0, len(self.body))))
class DeltaRuleTheory (object):
""" A collection of DeltaRules. """
def __init__(self, rules=None):
# dictionary from table name to list of rules with that table as trigger
self.contents = {}
# dictionary from table name to number of rules with that table in head
self.views = {}
if rules is not None:
for rule in rules:
def modify(self, delta, is_insert):
if is_insert is True:
return self.insert(delta)
return self.delete(delta)
def insert(self, delta):
if delta.head.table in self.views:
self.views[delta.head.table] += 1
self.views[delta.head.table] = 1
if delta.trigger.table not in self.contents:
self.contents[delta.trigger.table] = [delta]
def delete(self, delta):
if delta.head.table in self.views:
self.views[delta.head.table] -= 1
if self.views[delta.head.table] == 0:
del self.views[delta.head.table]
if delta.trigger.table not in self.contents:
def __str__(self):
return str(self.contents)
def rules_with_trigger(self, table):
if table not in self.contents:
return []
return self.contents[table]
def is_view(self, x):
return x in self.views
## Events
@ -442,158 +372,316 @@ class Database(object):
## Runtime classes
## Logical Building Blocks
class Runtime (object):
""" Runtime for the Congress policy language. Only have one instantiation
in practice, but using a class is natural and useful for testing. """
class Proof(object):
""" A single proof. Differs semantically from Database's
Proof in that this verison represents a proof that spans rules,
instead of just a proof for a single rule. """
def __init__(self, root, children):
self.root = root
self.children = children
class Proof(object):
""" A single proof. Differs semantically from Database's
Proof in that this verison represents a proof that spans rules,
instead of just a proof for a single rule. """
def __init__(self, root, children):
self.root = root
self.children = children
def __str__(self):
return self.str_tree(0)
def __str__(self):
return self.str_tree(0)
def str_tree(self, depth):
s = " " * depth
s += str(self.root)
s += "\n"
for child in self.children:
s += child.str_tree(depth + 1)
return s
def str_tree(self, depth):
s = " " * depth
s += str(self.root)
s += "\n"
for child in self.children:
s += child.str_tree(depth + 1)
return s
class DeltaRule(object):
def __init__(self, trigger, head, body, original):
self.trigger = trigger # atom
self.head = head # atom
self.body = body # list of atoms with is_negated()
self.original = original # Rule from which derived
def __str__(self):
return "<trigger: {}, head: {}, body: {}>".format(
str(self.trigger), str(self.head), [str(lit) for lit in self.body])
def __eq__(self, other):
return (self.trigger == other.trigger and
self.head == other.head and
len(self.body) == len(other.body) and
all(self.body[i] == other.body[i]
for i in xrange(0, len(self.body))))
class Unifier (object):
""" Store assignments of variables.
Assignments are not transitively closed, so
applying { x->y, y->z } to x yields z. """
def __init__(self):
self.bindings = {}
def add(self, var, value):
""" Add the variable assignment VAR -> VALUE.
Returns an object that when given to DELETE,
undoes the ADD. """
self.bindings[var] = value
return var
def apply(self, thing):
""" Return value that THING is assigned, or THING. """
# transitively close bindings
if self.bindings.has_key(thing):
return self.apply(self.bindings[thing])
return thing
def delete(self, thing):
""" Deletes assignment of THING
from binding list."""
if thing is not None:
del self.bindings[thing]
def unify(self, thing1, thing2):
""" Modify SELF if possible to make thing1 == thing2.
Return 2 values: whether it is possible and the variable
changed to perform the unifier or None. """
val1 = self.apply(thing1)
val2 = self.apply(thing2)
if val1 == val2:
return (True, None)
if self.is_variable(val1):
change = self.add(val1, val2)
return (True, change)
elif self.is_variable(val2):
change = self.add(val2, val1)
return (True, change)
return (False, None)
def is_variable (self, thing):
""" Checks whether THING is a variable. """
return isinstance(thing, basestring) and thing[0] == '?'
except TypeError:
return False
def plug(self, thing):
""" Creates a copy of thing but with all variables replaced
as per the unifier. Given an iterable, recursively applies
itself to each element of the iterable, putting the result
in a list. """
if isinstance(thing, (basestring, int, long, float)):
return self.apply(thing)
elif isinstance(thing, dict):
new = {}
for key in thing:
newkey = self.plug(key)
newvalue = self.plug(thing[key])
new[newkey] = newvalue
return new
elif isinstance(thing, set):
new = set()
for x in thing:
newx = self.plug(x)
return new
elif isinstance(thing, list):
return [self.plug(x) for x in thing]
elif isinstance(thing, tuple):
return tuple([self.plug(x) for x in thing])
raise TypeError(
"Plug does not handle type {}".format(type(thing)))
def __str__ (self):
return str(self.bindings)
## Theories
class RuleTheory(object):
""" A collection of Rules. """
def __init__(self, rules=None):
# rules dictating how an insert/delete to one table
# affects other tables
self.delta_rules = DeltaRuleTheory(rules)
# dictionary from table name to list of rules with that table in head
self.contents = {}
# list of other theories that are implicitly included in this one
self.includes = []
if rules is not None:
for rule in rules:
def __str__(self):
return str(self.contents)
def insert(self, rule):
table = rule.head.table
if table in self.contents:
if rule not in self.content[table]: # eliminate dups
self.contents[table] = [rule]
def delete(self, rule):
table = rule.head.table
if table in self.contents:
def select(self, query):
""" Return tuples in which QUERY is true. Current implementation
ignores rules with non-empty bodies and requires QUERY to be atomic."""
assert isinstance(query, compile.Atom), "Only have support for atoms"
results = set()
for rule in self.contents[query.table]:
if not isinstance(query, compile.Atom):
if match(query, rule) is not None:
results.add(tuple([ for arg in rule.arguments]))
# simple implementation b/c we're ignoring real rules.
# full implementation will need to check other theories in
# inner evaluation loop.
for theory in self.includes:
results |=
return results
def match(atom1, atom2):
""" Return Unifier, if it exists, that when applied to
ATOM1 results in the ground ATOM2. """
if len(atom1.arguments) != len(atom2.arguments):
return None
assert (all(not arg.is_variable() for arg in atom2.arguments),
"Match requires ATOM2 have no variables")
binding = Unifier()
for i in xrange(0, len(atom1.arguments)):
arg = atom1.arguments[i]
if arg.is_variable():
if in binding:
oldval = binding.apply(
if oldval != atom2.arguments[i]:
return None
binding.add(, atom2.arguments[i])
return binding
def return_true(*args):
return True
def abduce(self, query, abducibles, consistency=return_true):
""" Compute a collection of atoms with ABDUCIBLES in the head
that when added to SELF makes query QUERY true (for some
instance of QUERY). """
assert False, "Not yet implemented"
class TopDownContext(object):
def __init__(self, literals, literal_index, binding):
self.literals = literals
self.literal_index = literal_index
self.binding = binding
def top_down_evaluation(self, literals, literal_index, context, binding):
""" Compute all instances of LITERALS (from LITERAL_INDEX and above)
that are true according to the theory (after applying the
unifier BINDING to LITERALS). Returns a list of dictionary
bindings. """
assert False, "Not yet implemented"
class DeltaRuleTheory (object):
""" A collection of DeltaRules. """
def __init__(self, rules=None):
# dictionary from table name to list of rules with that table as trigger
self.contents = {}
# list of theories implicitly included in this one
self.includes = []
# dictionary from table name to number of rules with that table in head
self.views = {}
if rules is not None:
for rule in rules:
def insert(self, delta):
if delta.head.table in self.views:
self.views[delta.head.table] += 1
self.views[delta.head.table] = 1
if delta.trigger.table not in self.contents:
self.contents[delta.trigger.table] = [delta]
def delete(self, delta):
if delta.head.table in self.views:
self.views[delta.head.table] -= 1
if self.views[delta.head.table] == 0:
del self.views[delta.head.table]
if delta.trigger.table not in self.contents:
def modify(self, delta, is_insert):
if is_insert is True:
return self.insert(delta)
return self.delete(delta)
def __str__(self):
return str(self.contents)
def rules_with_trigger(self, table):
if table not in self.contents:
return []
return self.contents[table]
def is_view(self, x):
return x in self.views
class MaterializedRuleTheory(object):
""" A theory that stores the table contents explicitly. """
def __init__(self):
# queue of events left to process
self.queue = EventQueue()
# collection of all tables
self.database = Database()
# tracer object
self.tracer = Tracer()
# rules that dictate how database changes in response to events
self.delta_rules = DeltaRuleTheory()
def log(self, table, msg, depth=0):
self.tracer.log(table, "RT: " + msg, depth)
############### External interface ###############
def load_file(self, filename):
""" Compile the given FILENAME and insert each of the statements
into the runtime. """
compiler = compile.get_compiled([filename])
for formula in compiler.theory:
############### External Interface ###############
def select(self, query):
""" Event handler for arbitrary queries. Returns the set of
all instantiated QUERY that are true. """
if isinstance(query, basestring):
return self.select_string(query)
return self.select_obj(query)
def select_if(self, query, temporary_data):
""" Event handler for hypothetical queries. Returns the set of
all instantiated QUERYs that would be true IF
TEMPORARY_DATA were true. """
if isinstance(query, basestring):
return self.select_if_string(query, temporary_data)
return self.select_if_obj(query, temporary_data)
def explain(self, query):
""" Event handler for explanations. Given a ground query, return
a single proof that it belongs in the database. """
if isinstance(query, basestring):
return self.explain_string(query)
return self.explain_obj(query)
def insert(self, formula):
""" Event handler for arbitrary insertion (rules and facts). """
if isinstance(formula, basestring):
return self.insert_string(formula)
return self.insert_obj(formula)
def delete(self, formula):
""" Event handler for arbitrary deletion (rules and facts). """
if isinstance(formula, basestring):
return self.delete_string(formula)
return self.delete_obj(formula)
############### External typed interface ###############
def select_obj(self, query):
# should generalize to at least a (conjunction of atoms)
# should generalize to at least a conjunction of atoms.
# Need to change compiler a bit, but runtime should be fine.
assert ((isinstance(query, compile.Atom) or
isinstance(query, compile.Rule)),
"Only have support for atoms")
def select_string(self, policy_string):
def str_tuple_atom (atom):
s = atom[0]
s += '('
s += ', '.join([str(x) for x in atom[1:]])
s += ')'
return s
c = compile.get_compiled(['--input_string', policy_string])
assert len(c.theory) == 1, "Queries can have only 1 statement: {}".format(
[str(x) for x in c.theory])
results = self.select_obj(c.theory[0])
return " ".join([str(x) for x in results])
def select_if_obj(self, query, temporary_data):
assert False, "Not yet implemented"
def select_if_string(self, query_string, temporary_data):
assert False, "Not yet implemented"
def explain_obj(self, query):
assert isinstance(query, compile.Atom), "Only have support for literals"
return self.explain_aux(query, 0)
def explain_string(self, query_string):
c = compile.get_compiled([query_string, '--input_string'])
assert len(c.theory) == 1, "Queries can have only 1 statement"
assert c.theory[0].is_atom(), "Queries must be atomic"
results = self.explain_obj(c.theory[0])
return str(results)
def insert_obj(self, formula):
def insert(self, formula):
return self.modify(formula, is_insert=True)
def insert_string(self, policy_string):
c = compile.get_compiled([policy_string, '--input_string'])
for formula in c.theory:
#logging.debug("Parsed {}".format(str(formula)))
def delete_obj(self, formula):
def delete(self, formula):
return self.modify(formula, is_insert=False)
def delete_string(self, policy_string):
c = compile.get_compiled([policy_string, '--input_string'])
for formula in c.theory:
def explain(self, query):
assert isinstance(query, compile.Atom), "Only have support for atoms"
return self.explain_aux(query, 0)
############### Interface implementation ###############
def log(self, table, msg, depth=0):
self.tracer.log(table, "MRT: " + msg, depth)
def explain_aux(self, query, depth):
self.log(query.table, "Explaining {}".format(str(query)), depth)
if query.is_negated():
return self.Proof(query, [])
return Proof(query, [])
# grab first local proof, since they're all equally good
localproofs = self.database.explain(query)
if len(localproofs) == 0: # base fact
return self.Proof(query, [])
return Proof(query, [])
localproof = localproofs[0]
rule_instance = localproof.rule.plug(localproof.binding)
subproofs = []
@ -602,24 +690,25 @@ class Runtime (object):
if subproof is None:
return None
return self.Proof(query, subproofs)
return Proof(query, subproofs)
def modify(self, formula, is_insert=True):
""" Event handler for arbitrary insertion/deletion (rules and facts). """
if formula.is_atom():
if self.delta_rules.is_view(formula.table):
return self.view_update_options(formula)
args = tuple([ for arg in formula.arguments])
self.modify_tuple(formula.table, args, is_insert=is_insert)
return None
assert (not self.is_view(formula.table),
"Cannot directly modify tables computed from other tables")
args = tuple([ for arg in formula.arguments])
formula.table, args, is_insert=is_insert)
return None
self.modify_rule(formula, is_insert=is_insert)
formula, is_insert=is_insert)
for delta_rule in compile.compute_delta_rules([formula]):
self.delta_rules.modify(delta_rule, is_insert=is_insert)
return None
def modify_rule(self, rule, is_insert):
def modify_tables_with_rule(self, rule, is_insert):
""" Add rule (not a DeltaRule) to collection and update
tables as appropriate. """
# don't have separate queue since inserting/deleting a rule doesn't generate any
@ -630,7 +719,7 @@ class Runtime (object):
self.process_new_bindings(bindings, rule.head, is_insert, rule)
def modify_tuple(self, table, row, is_insert):
def modify_tables_with_tuple(self, table, row, is_insert):
""" Event handler for a tuple insertion/deletion.
TABLE is the name of a table (a string).
TUPLE is a Python tuple.
@ -700,6 +789,9 @@ class Runtime (object):
self.process_new_bindings(new_bindings, delta_rule.head, insert_delete,
def is_view(self, x):
return self.delta_rules.is_view(x)
def process_new_bindings(self, bindings, atom, insert, original_rule):
""" For each of BINDINGS, apply to ATOM, and enqueue it as an insert if
INSERT is True and as a delete otherwise. """
@ -726,10 +818,164 @@ class Runtime (object):
############### View updates ###############
def view_update_options(self):
return [None]
## Runtime
class Runtime (object):
""" Runtime for the Congress policy language. Only have one instantiation
in practice, but using a class is natural and useful for testing. """
# Names of theories
CLASSIFY_THEORY = "classification"
SERVICE_THEORY = "service"
ACTION_THEORY = "action"
def __init__(self):
# tracer object
self.tracer = Tracer()
# collection of theories
self.theory = {}
self.theory[self.CLASSIFY_THEORY] = MaterializedRuleTheory()
self.theory[self.SERVICE_THEORY] = RuleTheory()
self.theory[self.ACTION_THEORY] = RuleTheory()
# Service/Action theories build upon Classify theory
def log(self, table, msg, depth=0):
self.tracer.log(table, "RT: " + msg, depth)
############### External interface ###############
def get_target(self, name):
if name is None:
assert name in self.theory, "Unknown target {}".format(name)
return self.theory[name]
def load_file(self, filename, target=None):
""" Compile the given FILENAME and insert each of the statements
into the runtime. """
compiler = compile.get_compiled([filename])
for formula in compiler.theory:
self.insert(formula, target=target)
def select(self, query, target=None):
""" Event handler for arbitrary queries. Returns the set of
all instantiated QUERY that are true. """
if isinstance(query, basestring):
return self.select_string(query, self.get_target(target))
elif isinstance(query, tuple):
return self.select_tuple(query, self.get_target(target))
return self.select_obj(query, self.get_target(target))
# Maybe implement one day
# def select_if(self, query, temporary_data):
# """ Event handler for hypothetical queries. Returns the set of
# all instantiated QUERYs that would be true IF
# TEMPORARY_DATA were true. """
# if isinstance(query, basestring):
# return self.select_if_string(query, temporary_data)
# else:
# return self.select_if_obj(query, temporary_data)
def explain(self, query, target=None):
""" Event handler for explanations. Given a ground query, return
a single proof that it belongs in the database. """
if isinstance(query, basestring):
return self.explain_string(query, self.get_target(target))
elif isinstance(query, tuple):
return self.explain_tuple(query, self.get_target(target))
return self.explain_obj(query, self.get_target(target))
def insert(self, formula, target=None):
""" Event handler for arbitrary insertion (rules and facts). """
if isinstance(formula, basestring):
return self.insert_string(formula, self.get_target(target))
elif isinstance(formula, tuple):
return self.insert_tuple(formula, self.get_target(target))
return self.insert_obj(formula, self.get_target(target))
def delete(self, formula, target=None):
""" Event handler for arbitrary deletion (rules and facts). """
if isinstance(formula, basestring):
return self.delete_string(formula, self.get_target(target))
elif isinstance(formula, tuple):
return self.delete_tuple(formula, self.get_target(target))
return self.delete_obj(formula, self.get_target(target))
############### Internal interface ###############
## Only arguments allowed to be strings are suffixed with _string
## All other arguments are instances of Theory, Atom, etc.
def select_obj(self, query, theory):
def select_string(self, policy_string, theory):
def str_tuple_atom (atom):
s = atom[0]
s += '('
s += ', '.join([str(x) for x in atom[1:]])
s += ')'
return s
c = compile.get_compiled(['--input_string', policy_string])
assert (len(c.theory) == 1,
"Queries can have only 1 statement: {}".format(
[str(x) for x in c.theory]))
results = self.select_obj(c.theory[0], theory)
return " ".join([str(x) for x in results])
def select_tuple(self, tuple, theory):
return self.select_obj(self.tuple_to_atom(tuple), theory)
def explain_obj(self, query, theory):
return theory.explain(query)
def explain_string(self, query_string, theory):
c = compile.get_compiled([query_string, '--input_string'])
assert len(c.theory) == 1, "Queries can have only 1 statement"
assert c.theory[0].is_atom(), "Queries must be atomic"
results = self.explain_obj(c.theory[0], theory)
return str(results)
def explain_tuple(self, tuple, theory):
self.explain_obj(self.tuple_to_atom(tuple), theory)
def insert_obj(self, formula, theory):
return theory.insert(formula)
def insert_string(self, policy_string, theory):
c = compile.get_compiled([policy_string, '--input_string'])
for formula in c.theory:
#logging.debug("Parsed {}".format(str(formula)))
self.insert_obj(formula, theory)
def insert_tuple(self, tuple, theory):
self.insert_obj(self.tuple_to_atom(tuple), theory)
def delete_obj(self, formula, theory):
def delete_string(self, policy_string, theory):
c = compile.get_compiled([policy_string, '--input_string'])
for formula in c.theory:
self.delete_obj(formula, theory)
def delete_tuple(self, tuple, theory):
self.delete_obj(self.tuple_to_atom(tuple), theory)
############### Helpers ###############
def tuple_to_atom(self, tuple):
table = tuple[0]
args = [compile.Term.create_from_python(arg) for arg in tuple[1:]]
return compile.Atom(table, args)
def plug(atom, binding, withtable=False):
""" Returns a tuple representing the arguments to ATOM after having

View File

@ -16,19 +16,21 @@ class TestRuntime(unittest.TestCase):
# compile source
if msg is not None:
c = compile.get_compiled([code, '--input_string'])
run = runtime.Runtime(c.delta_rules)
run = runtime.Runtime()
tracer = runtime.Tracer()
run.tracer = tracer
run.database.tracer = tracer
run.theory[run.CLASSIFY_THEORY].tracer = tracer
run.theory[run.SERVICE_THEORY].tracer = tracer
run.theory[run.ACTION_THEORY].tracer = tracer
return run
def insert(self, run, list):
run.modify_tuple(list[0], tuple(list[1:]), is_insert=True)
def insert(self, run, alist):
def delete(self, run, list):
run.modify_tuple(list[0], tuple(list[1:]), is_insert=False)
def delete(self, run, alist):
def string_to_database(self, string):
c = compile.get_compiled([string, '--input_string'])
@ -61,7 +63,8 @@ class TestRuntime(unittest.TestCase):
# extract correct answer from correct_database_code
logging.debug("** Checking {} **".format(msg))
correct_database = self.string_to_database(correct_database_code)
self.check_db_diffs(run.database, correct_database, msg)
correct_database, msg)
logging.debug("** Finished {} **".format(msg))
def check_equal(self, actual_code, correct_code, msg=None):
@ -109,7 +112,8 @@ class TestRuntime(unittest.TestCase):
def showdb(self, run):
logging.debug("Resulting DB: " + str(run.database))
logging.debug("Resulting DB: {}".format(
def test_database(self):
code = ("")