From 15b590b1a68a331a203d67a5f24582cd0f290c11 Mon Sep 17 00:00:00 2001 From: Tim Hinrichs Date: Mon, 12 Aug 2013 12:45:57 -0700 Subject: [PATCH] Runtime passed first test Used different Database/Runtime interface, since the original was the cause of so many bugs. Issue: # Change-Id: Idd87c649c5d0d169265939ab2336a49942dfb555 --- policy/compiler.py | 6 +- policy/runtime.py | 383 +++++++++++++++++++++++++++++++-------------- 2 files changed, 267 insertions(+), 122 deletions(-) diff --git a/policy/compiler.py b/policy/compiler.py index 7ddd3b19d..c04b95dd8 100755 --- a/policy/compiler.py +++ b/policy/compiler.py @@ -100,6 +100,9 @@ class Atom (object): def is_rule(self): return False + def variable_names(self): + return set([x.name for x in self.arguments if x.is_variable()]) + class Literal(Atom): """ Represents either a negated atom or an atom. """ def __init__(self, table, arguments, negated=False, location=None): @@ -201,6 +204,7 @@ class Compiler (object): runtime.print_delta_rules() # insert stuff # BUG: handle case where only 1 element in body + # BUG: self-joins require inserting data into database before computing updates runtime.tracer.trace('?') runtime.handle_insert('p', tuple([1])) print "**Final State**" @@ -308,7 +312,7 @@ class CongressSyntax (object): args.append(cls.create_term(antlr.children[i])) loc = Location(line=antlr.children[0].token.line, col=antlr.children[0].token.charPositionInLine) - return (antlr.children[0], args, loc) + return (antlr.children[0].getText(), args, loc) @classmethod def create_term(cls, antlr): diff --git a/policy/runtime.py b/policy/runtime.py index 38c13b205..67da02fae 100644 --- a/policy/runtime.py +++ b/policy/runtime.py @@ -2,11 +2,6 @@ import collections -# Todo: -# Add to Atom: is_negated, variable_names() -# Actually, make Literal inherit from Atom and change is_negated - - class Tracer(object): def __init__(self): self.expressions = [] @@ -58,18 +53,130 @@ class Event(object): return "{}({})".format(self.table, ",".join([str(x) for x in self.tuple])) +# class Database(object): +# class DictTuple(object): +# def __init__(self, binding, refcount=1): +# self.binding = binding +# self.refcount = refcount + +# def __eq__(self, other): +# return self.binding == other.binding + +# def __str__(self): +# return "".format( +# str(self.binding), self.refcount) + +# def matches(self, binding): +# print "Checking if tuple {} matches binding {}".format(str(self), str(binding)) +# for column_name in self.binding.keys(): +# if column_name not in binding: +# return False +# if self.binding[column_name] != binding[column_name]: +# return False +# print "Check succeeded with binding {}".format(str(binding)) +# return True + +# class Schema (object): +# def __init__(self, column_names): +# self.arguments = column_names +# def __str__(self): +# return str(self.arguments) + +# def __init__(self): +# self.data = {'p': [], 'q': [], 'r': [self.DictTuple({1: 1})]} +# # self.data = {'p': [self.DictTuple({1: 'a'}), +# # self.DictTuple({1: 'b'}), +# # self.DictTuple({1, 'c'})], +# # 'q': [self.DictTuple({1: 'b'}), +# # self.DictTuple({1: 'c'}), +# # self.DictTuple({1, 'd'})], +# # 'r': [self.DictTuple({1: 'c'}), +# # self.DictTuple({1: 'd'}), +# # self.DictTuple({1, 'e'})] +# # } +# self.schemas = {'p': Database.Schema([1]), +# 'q': Database.Schema([1]), +# 'r': Database.Schema([1])} + +# def __str__(self): +# def hash2str (h): +# s = "{" +# s += ", ".join(["{} : {}".format(str(key), str(h[key])) +# for key in h]) +# return s + +# def hashlist2str (h): +# strings = [] +# for key in h: +# s = "{} : ".format(key) +# s += '[' +# s += ', '.join([str(val) for val in h[key]]) +# s += ']' +# strings.append(s) +# return '{' + ", ".join(strings) + '}' + +# return "".format( +# hashlist2str(self.data), hash2str(self.schemas)) + +# def get_matches(self, table, binding, columns=None): +# print "Getting matches for table {} with binding {}".format( +# str(table), str(binding)) +# if table not in self.data: +# raise CongressRuntime("Table not found ".format(table)) +# result = [] +# for dicttuple in self.data[table]: +# print "Matching database tuple {}".format(str(dicttuple)) +# if dicttuple.matches(binding): +# result.append(dicttuple) +# return result + +# def insert(self, table, binding, refcount=1): +# if table not in self.data: +# raise CongressRuntime("Table not found ".format(table)) +# for dicttuple in self.data[table]: +# if dicttuple.binding == binding: +# dicttuple.refcount += refcount +# return +# self.data[table].append(self.DictTuple(binding, refcount)) + +# def delete(self, table, binding, refcount=1): +# if table not in self.data: +# raise CongressRuntime("Table not found ".format(table)) +# for dicttuple in self.data[table]: +# if dicttuple.binding == binding: +# dicttuple.refcount -= refcount +# if dicttuple.refcount < 0: +# raise CongressRuntime("Deleted more tuples than existed") +# return +# raise CongressRuntime("Deleted tuple that didn't exist") + + class Database(object): - class Dicttuple(object): - def __init__(self, binding, refcount=1): - self.binding = binding - self.refcount = refcount + class DBTuple(object): + def __init__(self, tuple): + self.tuple = tuple def __eq__(self, other): - return self.binding == other.binding + return self.tuple == other.tuple def __str__(self): - return "".format( - str(self.binding), self.refcount) + return str(self.tuple) + + def match(self, atom, binding): + print "Checking if tuple {} matches atom {} with binding {}".format( + str(self), str(atom), str(binding)) + if len(self.tuple) != len(atom.arguments): + return None + new_binding = {} + for i in xrange(0, len(atom.arguments)): + if atom.arguments[i].name in binding: + # check existing binding + if binding[atom.arguments[i].name] != self.tuple[i]: + return None + else: + new_binding[atom.arguments[i].name] = self.tuple[i] + print "Check succeeded with binding {}".format(str(new_binding)) + return new_binding class Schema (object): def __init__(self, column_names): @@ -78,53 +185,63 @@ class Database(object): return str(self.arguments) def __init__(self): - self.data = {'p': [], 'q': [], 'r': [Database.Dicttuple({1: 1})]} - # self.data = {'p': [Dicttuple({1: 'a'}), - # Dicttuple({1: 'b'}), - # Dicttuple({1, 'c'})], - # 'q': [Dicttuple({1: 'b'}), - # Dicttuple({1: 'c'}), - # Dicttuple({1, 'd'})], - # 'r': [Dicttuple({1: 'c'}), - # Dicttuple({1: 'd'}), - # Dicttuple({1, 'e'})] - # } + self.data = {'p': [], 'q': [], 'r': [self.DBTuple((1,))]} self.schemas = {'p': Database.Schema([1]), 'q': Database.Schema([1]), 'r': Database.Schema([1])} def __str__(self): - return "".format( - str(self.data), str(self.schemas)) + def hash2str (h): + s = "{" + s += ", ".join(["{} : {}".format(str(key), str(h[key])) + for key in h]) + return s - def get_matches(table, binding, columns=None): - if table not in self.data: + def hashlist2str (h): + strings = [] + for key in h: + s = "{} : ".format(key) + s += '[' + s += ', '.join([str(val) for val in h[key]]) + s += ']' + strings.append(s) + return '{' + ", ".join(strings) + '}' + + return "".format( + hashlist2str(self.data), hash2str(self.schemas)) + + def get_matches(self, atom, binding): + """ Returns a list of binding lists for the variables in ATOM + not bound in BINDING: one binding list for each tuple in + the database matching ATOM under BINDING. """ + if atom.table not in self.data: raise CongressRuntime("Table not found ".format(table)) result = [] - for dicttuple in self.data[table]: - for col in binding: - if dicttuple[col] == binding[col]: - result.append(dicttuple) + for tuple in self.data[atom.table]: + print "Matching database tuple {}".format(str(tuple)) + new_binding = tuple.match(atom, binding) + if new_binding is not None: + result.append(new_binding) + return result - def insert(table, binding, refcount): + def insert(self, table, tuple): + print "Inserting table {} tuple {} into DB".format(table, str(tuple)) if table not in self.data: raise CongressRuntime("Table not found ".format(table)) - for dicttuple in self.data[table]: - if dicttuple.binding == binding: - dicttuple.refcount += refcount - return - self.data[table].append(dicttuple(binding, refcount)) + # if already present, ignore + if any([dbtuple.tuple == tuple for dbtuple in self.data[table]]): + return + self.data[table].append(self.DBTuple(tuple)) - def delete(table, binding, refcount): + def delete(self, table, binding): + print "Deleting table {} tuple {} from DB".format(table, str(tuple)) if table not in self.data: raise CongressRuntime("Table not found ".format(table)) - for dicttuple in self.data[table]: - if dicttuple.binding == binding: - dicttuple.refcount -= refcount - if dicttuple.refcount < 0: - raise CongressRuntime("Deleted more tuples than existed") - return - raise CongressRuntime("Deleted tuple that didn't exist") + locs = [i for i in xrange(0,len(self.data[table])) + if self.data[table][i].tuple == tuple] + for loc in locs: + del self.data[loc] + # queue of events left to process queue = EventQueue() @@ -136,60 +253,85 @@ delta_rules = {} tracer = Tracer() def handle_insert(table, tuple): + """ Event handler for an insertion. """ if tracer.is_traced(table): print "Inserting into queue: {} with {}".format(table, str(tuple)) + # insert tuple into actual table before propagating or else self-join bug. + # Self-joins harder to fix when multi-threaded. queue.enqueue(Event(table, tuple, insert=True)) process_queue() def handle_delete(table, tuple): + """ Event handler for a deletion. """ if tracer.is_traced(table): print "Inserting into queue: {} with {}".format(table, str(tuple)) queue.enqueue(Event(table, tuple, insert=False)) process_queue() def process_queue(): + """ Toplevel evaluation routine. """ while len(queue) > 0: - propagate(queue.dequeue()) + event = queue.dequeue() + if event.is_insert(): + database.insert(event.table, event.tuple) + else: + database.delete(event.table, event.tuple) + propagate(event) def propagate(event): + """ Computes events generated by EVENT and the DELTA_RULES, + and enqueues them. """ if tracer.is_traced(event.table): print "Processing event: {}".format(str(event)) - if event.table not in delta_rules: + if event.table not in delta_rules.keys(): print "event.table: {}".format(event.table) print_delta_rules() print "No applicable delta rule" return for delta_rule in delta_rules[event.table]: - propagate_rule(delta_rule, event) + propagate_rule(event, delta_rule) def propagate_rule(event, delta_rule): + """ Compute and enqueue new events generated by EVENT and DELTA_RULE. """ assert(not delta_rule.trigger.is_negated()) if tracer.is_traced(event.table): print "Processing event {} with rule {}".format(str(event), str(delta_rule)) # compute tuples generated by event (either for insert or delete) binding_list = match(event.tuple, delta_rule.trigger) - vars_in_head = delta_rule.head.variable_names() - needed_vars = set(vars_in_head) - set(binding_list.keys()) - new_tuples = top_down_eval(needed_vars, delta_rule.body, binding_list) - no_dups = eliminate_dups_with_ref_counts(new_tuples) + if binding_list is None: + return + print "binding_list for event-tuple and delta_rule trigger: " + str(binding_list) + # vars_in_head = delta_rule.head.variable_names() + # print "vars_in_head: " + str(vars_in_head) + # needed_vars = set(vars_in_head) + # print "needed_vars: " + str(needed_vars) + new_bindings = top_down_eval(delta_rule.body, 0, binding_list) + print "new bindings after top-down: " + ",".join([str(x) for x in new_bindings]) # enqueue effects of Event - head_table = delta_rule.head.operator - for (tuple, refcount) in new_tuples.items(): - queue.enqueue(Event(table=head_table, tuple=tuple, insert=event.insert, - refcount=refcount)) + head_table = delta_rule.head.table + for new_binding in new_bindings: + queue.enqueue(Event(table=head_table, + tuple=plug(delta_rule.head, new_binding), + insert=event.insert)) - # insert tuple into actual table - if event.is_insert(): - database.insert(event.table, event.tuple) - else: - database.delete(event.table, event.tuple) +def plug(atom, binding): + """ Returns a tuple representing the arguments to ATOM after having + applied BINDING to the variables in ATOM. """ + result = [] + for i in xrange(0, len(atom.arguments)): + if atom.arguments[i].is_variable() and atom.arguments[i].name in binding: + result.append(binding[atom.arguments[i].name]) + else: + result.append(atom.arguments[i].name) + return tuple(result) def match(tuple, atom): - """ Returns a binding dictionary """ + """ Returns a binding dictionary that when applied to ATOM's arguments + gives exactly TUPLE, or returns None if no such binding exists. """ if len(tuple) != len(atom.arguments): - return False + return None binding = {} for i in xrange(0, len(tuple)): arg = atom.arguments[i] @@ -197,9 +339,9 @@ def match(tuple, atom): if arg.name in binding: oldval = binding[arg.name] if oldval != tuple[i]: - return False + return None else: - bindings[arg.name] = tuple[i] + binding[arg.name] = tuple[i] return binding def eliminate_dups_with_ref_counts(tuples): @@ -211,78 +353,77 @@ def eliminate_dups_with_ref_counts(tuples): refcounts[tuple] = 0 return refcounts -def top_down_eval(projection, atoms, atom_index, var_bindings): - """ Compute all tuples making the conjunction of the list of atoms ATOMS - true under the variable bindings of dictionary BINDING_LIST, - where we only care about the variables in the list PROJECTION. """ +def top_down_eval(atoms, atom_index, binding): + """ Compute all instances of ATOMS (from ATOM_INDEX and above) that + are true in the Database (after applying the dictionary binding + BINDING to ATOMs). Returns a list of dictionary bindings. """ atom = atoms[atom_index] if tracer.is_traced(atom.table): - print ("Top-down eval(projection={}, atoms={}, atom_index={}, " + print ("Top-down eval(atoms={}, atom_index={}, " "bindings={})").format( - str(projection), "[" + ",".join(str(x) for x in atoms) + "]", atom_index, - str(bindings)) - # compute name-binding for table lookup - (name_bindings, missing_names) = \ - var_bindings_to_named_bindings(atom, var_bindings) - needed_names = missing_names & \ - (all_variables(atoms, atom_index + 1) | projection) - needed_names = list(needed_names) - # do lookup and get name-bindings back - dictbindings = database.get_matches( - atom.table, binding_list, columns=needed_names) - # turn name-bindings into var-bindings - name_var_bindings = atom_arg_names(atom) - new_var_bindings = [] - for dictbinding in dictbindings: - var_binding = {} - for name_val in dictbinding.binding: - var_binding[name_var_bindings[name_val]] = dictbinding.binding[name_val] - # turn each resulting tuple into a new binding list + str(binding)) + data_bindings = database.get_matches(atom, binding) + print "data_bindings: " + str(data_bindings) + if len(data_bindings) == 0: + return [] results = [] - for binding in new_var_bindings: + for data_binding in data_bindings: # add this binding to var_bindings - var_bindings.update(binding) - # recurse - results.extend(TOP_DOWN_EVAL(projection, atoms, atom_index+1, binding)) + binding.update(data_binding) + if atom_index == len(atoms) - 1: # last element in atoms + # construct result + # output_binding = {} + # for var in projection: + # output_binding[var] = binding[var] + # results.append(output_binding) + results.append(dict(binding)) # need to copy + else: + # recurse + results.extend(top_down_eval(atoms, atom_index + 1, binding)) # remove this binding from var_bindings - for var in binding: - del var_bindings[var] + for var in data_binding: + del binding[var] + if tracer.is_traced(atom.table): + print "Return value: {}".format([str(x) for x in results]) + return results -def atom_arg_names(atom): - if atom.table not in database.schemas: - raise CongressRuntime("Table {} has no schema".format(atom.table)) - schema = database.schemas[atom.table] - if len(atom.arguments) != len(schema.arguments): - raise CongressRuntime("Atom {} has wrong number of arguments for " - " schema: {}".format(atom, str(schema))) - mapping = {} - for i in xrange(0, len(atom.arguments) - 1): - mapping[schema.arguments[i]] = atom.arguments[i] - return mapping +# def atom_arg_names(atom): +# if atom.table not in database.schemas: +# raise CongressRuntime("Table {} has no schema".format(atom.table)) +# schema = database.schemas[atom.table] +# if len(atom.arguments) != len(schema.arguments): +# raise CongressRuntime("Atom {} has wrong number of arguments for " +# " schema: {}".format(atom, str(schema))) +# mapping = {} +# for i in xrange(0, len(atom.arguments)): +# mapping[schema.arguments[i]] = atom.arguments[i] +# return mapping def all_variables(atoms, atom_index): vars = set() - for i in xrange(atom_index, len(atoms) - 1): + for i in xrange(atom_index, len(atoms)): vars |= atoms[i].variable_names() return vars -def var_bindings_to_named_bindings(atom, var_bindings): - new_bindings = {} - unbound_names = set() - schema = database.schemas[atom.table] - assert(len(schema.arguments) == len(atom.arguments)) - for i in xrange(0, len(atom.arguments) - 1): - term = atom.arguments[i] - if term.is_object(): - new_bindings[schema.arguments[i]] = term.name - elif term in binding_list: - new_bindings[schema.arguments[i]] = binding_list[term] - else: - unbound_names.add(schema.arguments[i]) - return (new_bindings, unbound_names) +# def var_bindings_to_named_bindings(atom, var_bindings): +# new_bindings = {} +# unbound_names = set() +# schema = database.schemas[atom.table] +# print "schema: " + str(schema.arguments) +# assert(len(schema.arguments) == len(atom.arguments)) +# for i in xrange(0, len(atom.arguments)): +# term = atom.arguments[i] +# if term.is_object(): +# new_bindings[schema.arguments[i]] = term.name +# elif term.name in var_bindings: +# new_bindings[schema.arguments[i]] = var_bindings[term.name] +# else: +# unbound_names.add(schema.arguments[i]) +# print "new_bindings: {}, unbound_names: {}".format(new_bindings, unbound_names) +# return (new_bindings, unbound_names) def print_delta_rules(): print "runtime's delta rules"