Reorganize policy engines

Previously we had a single directory for our Datalog implementation
and the domain agnostic policy engine built on top of that implementation.

This change splits those two conceptually separate things into separate
directories: policy for the Datalog implementation and policy_engines
for the engines built using the Datalog implementation.

Change-Id: I359801e6ec9c65740ab13611be7875fdab788947
This commit is contained in:
Tim Hinrichs 2015-02-20 14:40:41 -08:00
parent 989b519d48
commit f72698b397
18 changed files with 272 additions and 288 deletions

3
.gitignore vendored
View File

@ -3,7 +3,8 @@ Congress.tokens
/congress/policy/CongressLexer.py
/congress/policy/CongressParser.py
subunit.log
congress/tests/policy/snapshot/test
congress/tests/policy_engines/snapshot/test
/doc/html
*.py[cod]

View File

@ -26,7 +26,7 @@ from congress.db import db_policy_rules
from congress.dse import deepsix
from congress.exception import PolicyException
from congress.openstack.common import log as logging
from congress.policy import runtime
from congress.policy_engines import agnostic
LOG = logging.getLogger(__name__)
@ -173,7 +173,7 @@ class RuleModel(deepsix.deepSix):
policy_name = self.policy_name(context)
if policy_name not in self.engine.theory:
raise KeyError("Policy with ID '%s' does not exist", policy_name)
event = runtime.Event(
event = agnostic.Event(
formula=parsed_rule,
insert=insert,
target=policy_name)

View File

@ -22,7 +22,7 @@ from congress.dse import deepsix
from congress import exception
from congress.openstack.common import log as logging
from congress.policy import compile
from congress.policy import runtime
from congress.policy_engines import agnostic
from congress import utils
import datetime
@ -984,11 +984,11 @@ class DataSourceDriver(deepsix.deepSix):
result = []
for row in to_add:
formula = compile.Literal.create_from_table_tuple(dataindex, row)
event = runtime.Event(formula=formula, insert=True)
event = agnostic.Event(formula=formula, insert=True)
result.append(event)
for row in to_del:
formula = compile.Literal.create_from_table_tuple(dataindex, row)
event = runtime.Event(formula=formula, insert=False)
event = agnostic.Event(formula=formula, insert=False)
result.append(event)
if len(result) == 0:
# Policy engine expects an empty update to be an init msg
@ -997,7 +997,7 @@ class DataSourceDriver(deepsix.deepSix):
result = None
text = "None"
else:
text = runtime.iterstr(result)
text = agnostic.iterstr(result)
self.log("prepush_processor for <%s> returning with %s items",
dataindex, text)
return result

View File

@ -54,7 +54,7 @@ def create(rootdir, statedir, config_override=None):
src_path = os.path.join(rootdir, "congress")
# add policy engine
engine_path = os.path.join(src_path, "policy/dsepolicy.py")
engine_path = os.path.join(src_path, "policy_engines/agnostic.py")
LOG.info("main::start() engine_path: %s", engine_path)
cage.loadModule("PolicyEngine", engine_path)
cage.createservice(

View File

@ -1,170 +0,0 @@
# Copyright (c) 2014 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from congress.dse import deepsix
from congress.openstack.common import log as logging
from congress.policy import compile
from congress.policy import runtime
LOG = logging.getLogger(__name__)
def d6service(name, keys, inbox, datapath, args):
return DseRuntime(name, keys, inbox, datapath, args)
def parse_tablename(tablename):
"""Given tablename returns (service, name)."""
pieces = tablename.split(':')
if len(pieces) == 1:
return (None, pieces[0])
else:
return (pieces[0], ':'.join(pieces[1:]))
class DseRuntime (runtime.Runtime, deepsix.deepSix):
def __init__(self, name, keys, inbox, datapath, args):
runtime.Runtime.__init__(self)
deepsix.deepSix.__init__(self, name, keys, inbox=inbox,
dataPath=datapath)
self.msg = None
self.last_policy_change = None
self.d6cage = args['d6cage']
self.rootdir = args['rootdir']
def extend_schema(self, service_name, schema):
newschema = {}
for key, value in schema:
newschema[service_name + ":" + key] = value
super(DseRuntime, self).extend_schema(self, newschema)
def receive_msg(self, msg):
self.log("received msg %s", msg)
self.msg = msg
def receive_data(self, msg):
"""Event handler for when a dataservice publishes data.
That data can either be the full table (as a list of tuples)
or a delta (a list of Events).
"""
self.log("received data msg %s", msg)
# if empty data, assume it is an init msg, since noop otherwise
if len(msg.body.data) == 0:
self.receive_data_full(msg)
else:
# grab an item from any iterable
dataelem = iter(msg.body.data).next()
if isinstance(dataelem, runtime.Event):
self.receive_data_update(msg)
else:
self.receive_data_full(msg)
def receive_data_full(self, msg):
"""Handler for when dataservice publishes full table."""
self.log("received full data msg for %s: %s",
msg.header['dataindex'], runtime.iterstr(msg.body.data))
tablename = msg.header['dataindex']
service = msg.replyTo
# Use a generator to avoid instantiating all these Facts at once.
literals = (compile.Fact(tablename, row) for row in msg.body.data)
self.initialize_tables([tablename], literals, target=service)
self.log("full data msg for %s", tablename)
def receive_data_update(self, msg):
"""Handler for when dataservice publishes a delta."""
self.log("received update data msg for %s: %s",
msg.header['dataindex'], runtime.iterstr(msg.body.data))
events = msg.body.data
for event in events:
assert compile.is_atom(event.formula), (
"receive_data_update received non-atom: " +
str(event.formula))
# prefix tablename with data source
event.target = msg.replyTo
(permitted, changes) = self.update(events)
if not permitted:
raise runtime.CongressRuntime(
"Update not permitted." + '\n'.join(str(x) for x in changes))
else:
tablename = msg.header['dataindex']
service = msg.replyTo
self.log("update data msg for %s from %s caused %d "
"changes: %s", tablename, service, len(changes),
runtime.iterstr(changes))
if tablename in self.theory[service].tablenames():
rows = self.theory[service].content([tablename])
self.log("current table: %s", runtime.iterstr(rows))
def receive_policy_update(self, msg):
self.log("received policy-update msg %s",
runtime.iterstr(msg.body.data))
# update the policy and subscriptions to data tables.
self.last_policy_change = self.process_policy_update(msg.body.data)
def process_policy_update(self, events):
self.log("process_policy_update %s" % events)
oldtables = self.tablenames()
LOG.debug("Old Tables: %s" % oldtables)
result = self.update(events)
newtables = self.tablenames()
LOG.debug("New Tables: %s" % newtables)
self.update_table_subscriptions(oldtables, newtables)
return result
def initialize_table_subscriptions(self):
"""Initialize table subscription.
Once policies have all been loaded, this function subscribes to
all the necessary tables. See UPDATE_TABLE_SUBSCRIPTIONS as well.
"""
self.update_table_subscriptions(set(), self.tablenames())
def update_table_subscriptions(self, oldtables, newtables):
"""Update table subscription.
Change the subscriptions from OLDTABLES to NEWTABLES, ensuring
to load all the appropriate services.
"""
add = newtables - oldtables
rem = oldtables - newtables
self.log("Tables:: Old: %s, new: %s, add: %s, rem: %s",
oldtables, newtables, add, rem)
# subscribe to the new tables (loading services as required)
for table in add:
if not self.reserved_tablename(table):
(service, tablename) = parse_tablename(table)
if service is not None:
self.log("Subscribing to new (service, table): (%s, %s)",
service, tablename)
self.subscribe(service, tablename,
callback=self.receive_data)
# TODO(thinrichs): figure out scheme for removing old services once
# their tables are no longer needed. Leaving them around is
# basically a memory leak, but deleting them too soon
# might mean fat-fingering policy yields large performance hits
# (e.g. if we need to re-sync entirely). Probably create a queue
# of these tables, keep them up to date, and gc them after
# some amount of time.
# unsubscribe from the old tables
for table in rem:
(service, tablename) = parse_tablename(table)
if service is not None:
self.log("Unsubscribing to new (service, table): (%s, %s)",
service, tablename)
self.unsubscribe(service, tablename)

View File

View File

@ -14,6 +14,8 @@
#
import os
from congress.dse import deepsix
from congress.exception import CongressException
from congress.exception import PolicyException
from congress.openstack.common import log as logging
from congress.policy.base import ACTION_POLICY_TYPE
@ -933,3 +935,154 @@ class ExperimentalRuntime (Runtime):
continue
if self.logger is not None:
self.logger.info("%s", action)
##############################################################################
# Engine that operates on the DSE
##############################################################################
def d6service(name, keys, inbox, datapath, args):
return DseRuntime(name, keys, inbox, datapath, args)
def parse_tablename(tablename):
"""Given tablename returns (service, name)."""
pieces = tablename.split(':')
if len(pieces) == 1:
return (None, pieces[0])
else:
return (pieces[0], ':'.join(pieces[1:]))
class DseRuntime (Runtime, deepsix.deepSix):
def __init__(self, name, keys, inbox, datapath, args):
Runtime.__init__(self)
deepsix.deepSix.__init__(self, name, keys, inbox=inbox,
dataPath=datapath)
self.msg = None
self.last_policy_change = None
self.d6cage = args['d6cage']
self.rootdir = args['rootdir']
def extend_schema(self, service_name, schema):
newschema = {}
for key, value in schema:
newschema[service_name + ":" + key] = value
super(DseRuntime, self).extend_schema(self, newschema)
def receive_msg(self, msg):
self.log("received msg %s", msg)
self.msg = msg
def receive_data(self, msg):
"""Event handler for when a dataservice publishes data.
That data can either be the full table (as a list of tuples)
or a delta (a list of Events).
"""
self.log("received data msg %s", msg)
# if empty data, assume it is an init msg, since noop otherwise
if len(msg.body.data) == 0:
self.receive_data_full(msg)
else:
# grab an item from any iterable
dataelem = iter(msg.body.data).next()
if isinstance(dataelem, Event):
self.receive_data_update(msg)
else:
self.receive_data_full(msg)
def receive_data_full(self, msg):
"""Handler for when dataservice publishes full table."""
self.log("received full data msg for %s: %s",
msg.header['dataindex'], iterstr(msg.body.data))
tablename = msg.header['dataindex']
service = msg.replyTo
# Use a generator to avoid instantiating all these Facts at once.
literals = (compile.Fact(tablename, row) for row in msg.body.data)
self.initialize_tables([tablename], literals, target=service)
self.log("full data msg for %s", tablename)
def receive_data_update(self, msg):
"""Handler for when dataservice publishes a delta."""
self.log("received update data msg for %s: %s",
msg.header['dataindex'], iterstr(msg.body.data))
events = msg.body.data
for event in events:
assert compile.is_atom(event.formula), (
"receive_data_update received non-atom: " +
str(event.formula))
# prefix tablename with data source
event.target = msg.replyTo
(permitted, changes) = self.update(events)
if not permitted:
raise CongressException(
"Update not permitted." + '\n'.join(str(x) for x in changes))
else:
tablename = msg.header['dataindex']
service = msg.replyTo
self.log("update data msg for %s from %s caused %d "
"changes: %s", tablename, service, len(changes),
iterstr(changes))
if tablename in self.theory[service].tablenames():
rows = self.theory[service].content([tablename])
self.log("current table: %s", iterstr(rows))
def receive_policy_update(self, msg):
self.log("received policy-update msg %s",
iterstr(msg.body.data))
# update the policy and subscriptions to data tables.
self.last_policy_change = self.process_policy_update(msg.body.data)
def process_policy_update(self, events):
self.log("process_policy_update %s" % events)
oldtables = self.tablenames()
result = self.update(events)
newtables = self.tablenames()
self.update_table_subscriptions(oldtables, newtables)
return result
def initialize_table_subscriptions(self):
"""Initialize table subscription.
Once policies have all been loaded, this function subscribes to
all the necessary tables. See UPDATE_TABLE_SUBSCRIPTIONS as well.
"""
self.update_table_subscriptions(set(), self.tablenames())
def update_table_subscriptions(self, oldtables, newtables):
"""Update table subscription.
Change the subscriptions from OLDTABLES to NEWTABLES, ensuring
to load all the appropriate services.
"""
add = newtables - oldtables
rem = oldtables - newtables
self.log("Tables:: Old: %s, new: %s, add: %s, rem: %s",
oldtables, newtables, add, rem)
# subscribe to the new tables (loading services as required)
for table in add:
if not self.reserved_tablename(table):
(service, tablename) = parse_tablename(table)
if service is not None:
self.log("Subscribing to new (service, table): (%s, %s)",
service, tablename)
self.subscribe(service, tablename,
callback=self.receive_data)
# TODO(thinrichs): figure out scheme for removing old services once
# their tables are no longer needed. Leaving them around is
# basically a memory leak, but deleting them too soon
# might mean fat-fingering policy yields large performance hits
# (e.g. if we need to re-sync entirely). Probably create a queue
# of these tables, keep them up to date, and gc them after
# some amount of time.
# unsubscribe from the old tables
for table in rem:
(service, tablename) = parse_tablename(table)
if service is not None:
self.log("Unsubscribing to new (service, table): (%s, %s)",
service, tablename)
self.unsubscribe(service, tablename)

View File

@ -13,8 +13,8 @@
# under the License.
#
import congress.dse.d6cage
import congress.policy.compile as compile
import congress.policy.runtime as runtime
from congress.policy import compile
from congress.policy_engines import agnostic
from congress.tests import base
import congress.tests.helper as helper
@ -75,7 +75,7 @@ class TestDSE(base.TestCase):
policy.subscribe('data', 'p', callback=policy.receive_data)
formula = policy.parse1('p(1)')
# sending a single Insert. (Default for Event is Insert.)
data.publish('p', [runtime.Event(formula)])
data.publish('p', [agnostic.Event(formula)])
helper.retry_check_db_equal(policy, 'data:p(x)', 'data:p(1)')
def test_policy_tables(self):
@ -101,18 +101,18 @@ class TestDSE(base.TestCase):
callback=policy.receive_policy_update)
# simulate API call for insertion of policy statements
formula = policy.parse1('p(x) :- data:q(x)')
api.publish('policy-update', [runtime.Event(formula)])
api.publish('policy-update', [agnostic.Event(formula)])
helper.retry_check_nonempty_last_policy_change(policy)
# simulate data source publishing to q
formula = policy.parse1('q(1)')
data.publish('q', [runtime.Event(formula)])
data.publish('q', [agnostic.Event(formula)])
helper.retry_check_db_equal(policy, 'data:q(x)', 'data:q(1)')
# check that policy did the right thing with data
e = helper.db_equal(policy.select('p(x)'), 'p(1)')
self.assertTrue(e, 'Policy insert')
# check that publishing into 'p' does not work
formula = policy.parse1('p(3)')
data.publish('p', [runtime.Event(formula)])
data.publish('p', [agnostic.Event(formula)])
# can't actually check that the update for p does not arrive
# so instead wait a bit and check
helper.pause()

View File

@ -21,8 +21,8 @@ import retrying
from congress.openstack.common import log as logging
from congress.policy import compile
from congress.policy import runtime
from congress.policy import unify
from congress.policy_engines import agnostic
LOG = logging.getLogger(__name__)
@ -63,8 +63,8 @@ def data_module_path(file):
def policy_module_path():
"""Return path to policy engine module."""
path = source_path()
path = os.path.join(path, "policy")
path = os.path.join(path, "dsepolicy.py")
path = os.path.join(path, "policy_engines")
path = os.path.join(path, "agnostic.py")
return path
@ -167,8 +167,8 @@ def datalog_equal(actual_code, correct_code,
def db_equal(actual_string, correct_string, output_diff=True):
"""Check if two strings representing data theories are the same."""
actual = runtime.string_to_database(actual_string)
correct = runtime.string_to_database(correct_string)
actual = agnostic.string_to_database(actual_string)
correct = agnostic.string_to_database(correct_string)
return check_db_diffs(actual, correct, output_diff=output_diff)

View File

@ -18,7 +18,7 @@ from congress.exception import PolicyException
from congress.openstack.common import log as logging
from congress.policy.builtin import congressbuiltin
from congress.policy import compile
from congress.policy import runtime
from congress.policy_engines import agnostic
from congress.tests import base
from congress.tests import helper
@ -245,10 +245,10 @@ class TestTheories(base.TestCase):
code = ""
if target is None:
target = NREC_THEORY
run = runtime.Runtime()
run.theory[NREC_THEORY] = runtime.NonrecursiveRuleTheory(
run = agnostic.Runtime()
run.theory[NREC_THEORY] = agnostic.NonrecursiveRuleTheory(
name="Nonrecursive", abbr="NRT")
run.theory[MAT_THEORY] = runtime.MaterializedViewTheory(
run.theory[MAT_THEORY] = agnostic.MaterializedViewTheory(
name="Materialized", abbr="MAT")
run.debug_mode()
run.insert(code, target=target)

View File

@ -14,7 +14,7 @@
#
from congress.exception import PolicyException
from congress.policy import compile
from congress.policy import runtime
from congress.policy_engines import agnostic
from congress.tests import base
from congress.tests import helper
@ -30,7 +30,7 @@ class TestParser(base.TestCase):
"""Test column-references with low-level checks."""
# do the first one the painful way, to ensure the parser
# is doing something reasonable.
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('nova')
nova_schema = compile.Schema({'q': ('id', 'name', 'status')})
run.set_schema('nova', nova_schema, complete=True)
@ -54,7 +54,7 @@ class TestParser(base.TestCase):
def test_column_references_atom(self):
"""Test column references occurring in a single atom in a rule."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('nova')
nova_schema = compile.Schema({'q': ('id', 'name', 'status')})
run.set_schema('nova', nova_schema, complete=True)
@ -132,7 +132,7 @@ class TestParser(base.TestCase):
def test_column_references_atom_errors(self):
"""Test invalid column references occurring in a single atom."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('nova')
schema = compile.Schema({'q': ('id', 'name', 'status'),
'r': ('id', 'age', 'weight')})
@ -194,7 +194,7 @@ class TestParser(base.TestCase):
def test_column_references_multiple_atoms(self):
"""Test column references occurring in multiple atoms in a rule."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('nova')
schema = compile.Schema({'q': ('id', 'name', 'status'),
'r': ('id', 'age', 'weight')})
@ -356,7 +356,7 @@ class TestCompiler(base.TestCase):
def test_module_schemas(self):
"""Test that rules are properly checked against module schemas."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('mod1')
run.create_policy('mod2')
run.set_schema('mod1', compile.Schema({'p': (1, 2, 3), 'q': (1,)}),

View File

@ -17,7 +17,7 @@ from congress.openstack.common import log as logging
from congress.policy.base import DATABASE_POLICY_TYPE
from congress.policy.base import MATERIALIZED_POLICY_TYPE
from congress.policy import compile
from congress.policy import runtime
from congress.policy_engines import agnostic
from congress.tests import base
from congress.tests import helper
@ -38,7 +38,7 @@ class TestRuntime(base.TestCase):
code = ""
if target is None:
target = MAT_THEORY
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy(MAT_THEORY, kind=MATERIALIZED_POLICY_TYPE)
run.create_policy(DB_THEORY, kind=DATABASE_POLICY_TYPE)
# ensure inserts without target go to MAT_THEORY

View File

@ -15,7 +15,7 @@
from congress.openstack.common import log as logging
from congress.policy.base import DATABASE_POLICY_TYPE
from congress.policy.base import NONRECURSIVE_POLICY_TYPE
from congress.policy import runtime
from congress.policy_engines import agnostic
from congress.tests import base
from congress.tests import helper
@ -34,7 +34,7 @@ class TestRuntime(base.TestCase):
code = ""
if target is None:
target = NREC_THEORY
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy(NREC_THEORY, kind=NONRECURSIVE_POLICY_TYPE)
run.create_policy(DB_THEORY, kind=DATABASE_POLICY_TYPE)
run.debug_mode()
@ -476,7 +476,7 @@ class TestRuntime(base.TestCase):
def test_modals(self):
"""Test that the modal operators work properly."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.debug_mode()
LOG.debug("print me")
run.create_policy("test")

View File

@ -17,8 +17,8 @@ import os
from congress.openstack.common import log as logging
from congress.policy import compile
from congress.policy import runtime
from congress.policy import unify
from congress.policy_engines import agnostic
from congress.tests import base
LOG = logging.getLogger(__name__)
@ -42,10 +42,10 @@ class TestRuntime(base.TestCase):
code = ""
if target is None:
target = MAT_THEORY
run = runtime.Runtime()
run.theory[NREC_THEORY] = runtime.NonrecursiveRuleTheory()
run.theory[DB_THEORY] = runtime.Database()
run.theory[MAT_THEORY] = runtime.MaterializedViewTheory()
run = agnostic.Runtime()
run.theory[NREC_THEORY] = agnostic.NonrecursiveRuleTheory()
run.theory[DB_THEORY] = agnostic.Database()
run.theory[MAT_THEORY] = agnostic.MaterializedViewTheory()
run.debug_mode()
run.insert(code, target=target)
return run
@ -59,7 +59,7 @@ class TestRuntime(base.TestCase):
self.open(msg)
db_class = run.theory[MAT_THEORY].database
# self.showdb(run)
correct = runtime.string_to_database(correct_database_code)
correct = agnostic.string_to_database(correct_database_code)
self.check_db_diffs(db_class, correct, msg)
self.close(msg)
@ -71,7 +71,7 @@ class TestRuntime(base.TestCase):
"""
# extract correct answer from correct_database_code
self.open(msg)
correct_database = runtime.string_to_database(correct_database_code)
correct_database = agnostic.string_to_database(correct_database_code)
self.check_db_diffs(run.theory[DB_THEORY],
correct_database, msg)
self.close(msg)
@ -227,12 +227,12 @@ class TestRuntime(base.TestCase):
permitted, errors = run.insert(ac_code, target=acth)
self.assertTrue(permitted,
"Error in access control policy: {}".format(
runtime.iterstr(errors)))
agnostic.iterstr(errors)))
clsth = run.CLASSIFY_THEORY
permitted, errors = run.insert(class_code, target=clsth)
self.assertTrue(permitted, "Error in classifier policy: {}".format(
runtime.iterstr(errors)))
agnostic.iterstr(errors)))
return run
def check_true(run, query, support='', msg=None):
@ -279,7 +279,7 @@ class TestRuntime(base.TestCase):
def test_enforcement(self):
"""Test enforcement."""
def prep_runtime(enforce_theory, action_theory, class_theory):
run = runtime.Runtime()
run = agnostic.Runtime()
run.insert(enforce_theory, target=run.ENFORCEMENT_THEORY)
run.insert(action_theory, target=run.ACTION_THEORY)
run.insert(class_theory, target=run.CLASSIFY_THEORY)
@ -308,7 +308,7 @@ class TestRuntime(base.TestCase):
full_path = os.path.realpath(__file__)
path = os.path.dirname(full_path)
neutron_path = path + "/../../../examples/neutron.action"
run = runtime.Runtime()
run = agnostic.Runtime()
run.debug_mode()
permitted, errs = run.load_file(neutron_path, target=run.ACTION_THEORY)
if not permitted:

View File

@ -21,7 +21,7 @@ from congress.policy.base import DATABASE_POLICY_TYPE
from congress.policy.base import MATERIALIZED_POLICY_TYPE
from congress.policy.base import NONRECURSIVE_POLICY_TYPE
from congress.policy.compile import Fact
from congress.policy import runtime
from congress.policy_engines import agnostic
from congress.tests import base
from congress.tests import helper
@ -40,9 +40,9 @@ class TestRuntime(base.TestCase):
def test_theory_inclusion(self):
"""Test evaluation routines when one theory includes another."""
# spread out across inclusions
th1 = runtime.NonrecursiveRuleTheory()
th2 = runtime.NonrecursiveRuleTheory()
th3 = runtime.NonrecursiveRuleTheory()
th1 = agnostic.NonrecursiveRuleTheory()
th2 = agnostic.NonrecursiveRuleTheory()
th3 = agnostic.NonrecursiveRuleTheory()
th1.includes.append(th2)
th2.includes.append(th3)
@ -56,7 +56,7 @@ class TestRuntime(base.TestCase):
'p(1)', 'Data spread across inclusions')
def test_get_arity(self):
th = runtime.NonrecursiveRuleTheory()
th = agnostic.NonrecursiveRuleTheory()
th.insert(helper.str2form('q(x) :- p(x)'))
th.insert(helper.str2form('p(x) :- s(x)'))
self.assertEqual(th.get_arity('p'), 1)
@ -70,13 +70,13 @@ class TestRuntime(base.TestCase):
e = helper.datalog_equal(actual, correct)
self.assertTrue(e)
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('th1')
run.create_policy('th2')
events1 = [runtime.Event(formula=x, insert=True, target='th1')
events1 = [agnostic.Event(formula=x, insert=True, target='th1')
for x in helper.str2pol("p(1) p(2) q(1) q(3)")]
events2 = [runtime.Event(formula=x, insert=True, target='th2')
events2 = [agnostic.Event(formula=x, insert=True, target='th2')
for x in helper.str2pol("r(1) r(2) t(1) t(4)")]
run.update(events1 + events2)
@ -86,8 +86,8 @@ class TestRuntime(base.TestCase):
check_equal(run.select('t(x)', 'th2'), 't(1) t(4)')
def test_initialize_tables(self):
"""Test initialize_tables() functionality of Runtime."""
run = runtime.Runtime()
"""Test initialize_tables() functionality of agnostic."""
run = agnostic.Runtime()
run.create_policy('test')
run.insert('p(1) p(2)')
facts = [Fact('p', (3,)), Fact('p', (4,))]
@ -97,7 +97,7 @@ class TestRuntime(base.TestCase):
def test_dump_load(self):
"""Test if dumping/loading theories works properly."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('test')
run.debug_mode()
policy = ('p(4,"a","bcdef ghi", 17.1) '
@ -109,7 +109,7 @@ class TestRuntime(base.TestCase):
path = os.path.dirname(full_path)
path = os.path.join(path, "snapshot")
run.dump_dir(path)
run = runtime.Runtime()
run = agnostic.Runtime()
run.load_dir(path)
e = helper.datalog_equal(run.theory['test'].content_string(),
policy, 'Service theory dump/load')
@ -118,7 +118,7 @@ class TestRuntime(base.TestCase):
def test_single_policy(self):
"""Test ability to create/delete single policies."""
# single policy
run = runtime.Runtime()
run = agnostic.Runtime()
original = run.policy_names()
run.create_policy('test1')
run.insert('p(x) :- q(x)', 'test1')
@ -134,7 +134,7 @@ class TestRuntime(base.TestCase):
def test_multi_policy(self):
"""Test ability to create/delete multiple policies."""
# multiple policies
run = runtime.Runtime()
run = agnostic.Runtime()
original = run.policy_names()
run.create_policy('test2')
run.create_policy('test3')
@ -157,28 +157,28 @@ class TestRuntime(base.TestCase):
def test_policy_types(self):
"""Test types for multiple policies."""
# policy types
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('test1', kind=NONRECURSIVE_POLICY_TYPE)
self.assertTrue(isinstance(run.policy_object('test1'),
runtime.NonrecursiveRuleTheory),
agnostic.NonrecursiveRuleTheory),
'Nonrecursive policy addition')
run.create_policy('test2', kind=ACTION_POLICY_TYPE)
self.assertTrue(isinstance(run.policy_object('test2'),
runtime.ActionTheory),
agnostic.ActionTheory),
'Action policy addition')
run.create_policy('test3', kind=DATABASE_POLICY_TYPE)
self.assertTrue(isinstance(run.policy_object('test3'),
runtime.Database),
agnostic.Database),
'Database policy addition')
run.create_policy('test4', kind=MATERIALIZED_POLICY_TYPE)
self.assertTrue(isinstance(run.policy_object('test4'),
runtime.MaterializedViewTheory),
agnostic.MaterializedViewTheory),
'Materialized policy addition')
def test_policy_errors(self):
"""Test errors for multiple policies."""
# errors
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('existent')
self.assertRaises(KeyError, run.create_policy, 'existent')
self.assertRaises(KeyError, run.delete_policy, 'nonexistent')
@ -189,7 +189,7 @@ class TestMultipolicyRules(base.TestCase):
def test_external(self):
"""Test ability to write rules that span multiple policies."""
# External theory
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('test1')
run.insert('q(1)', target='test1')
run.insert('q(2)', target='test1')
@ -201,7 +201,7 @@ class TestMultipolicyRules(base.TestCase):
def test_multi_external(self):
"""Test multiple rules that span multiple policies."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.debug_mode()
run.create_policy('test1')
run.create_policy('test2')
@ -217,7 +217,7 @@ class TestMultipolicyRules(base.TestCase):
def test_external_current(self):
"""Test ability to write rules that span multiple policies."""
# External theory plus current theory
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('test1')
run.insert('q(1)', target='test1')
run.insert('q(2)', target='test1')
@ -232,7 +232,7 @@ class TestMultipolicyRules(base.TestCase):
def test_ignore_local(self):
"""Test ability to write rules that span multiple policies."""
# Local table ignored
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('test1')
run.insert('q(1)', target='test1')
run.insert('q(2)', target='test1')
@ -249,7 +249,7 @@ class TestMultipolicyRules(base.TestCase):
def test_local(self):
"""Test ability to write rules that span multiple policies."""
# Local table used
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('test1')
run.insert('q(1)', target='test1')
run.insert('q(2)', target='test1')
@ -263,7 +263,7 @@ class TestMultipolicyRules(base.TestCase):
def test_multiple_external(self):
"""Test ability to write rules that span multiple policies."""
# Multiple external theories
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy('test1')
run.insert('q(1)', target='test1')
run.insert('q(2)', target='test1')
@ -281,7 +281,7 @@ class TestMultipolicyRules(base.TestCase):
def test_multiple_levels_external(self):
"""Test ability to write rules that span multiple policies."""
# Multiple levels of external theories
run = runtime.Runtime()
run = agnostic.Runtime()
run.debug_mode()
run.create_policy('test1')
run.insert('p(x) :- test2:q(x), test3:q(x)', target='test1')
@ -300,7 +300,7 @@ class TestMultipolicyRules(base.TestCase):
def test_multipolicy_head(self):
"""Test SELECT with different policy in the head."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.debug_mode()
run.create_policy('test1', kind='action')
run.create_policy('test2', kind='action')
@ -314,7 +314,7 @@ class TestMultipolicyRules(base.TestCase):
def test_multipolicy_normal_errors(self):
"""Test errors arising from rules in multiple policies."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.debug_mode()
run.create_policy('test1')
@ -358,7 +358,7 @@ class TestMultipolicyRules(base.TestCase):
def test_multipolicy_action_errors(self):
"""Test errors arising from rules in action policies."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.debug_mode()
run.create_policy('test1', kind='action')
@ -382,7 +382,7 @@ class TestMultipolicyRules(base.TestCase):
def test_dependency_graph(self):
"""Test that dependency graph gets updated correctly."""
run = runtime.Runtime()
run = agnostic.Runtime()
run.debug_mode()
g = run.global_dependency_graph
@ -407,8 +407,8 @@ class TestMultipolicyRules(base.TestCase):
self.assertTrue(g.edge_in('test:p', 'test:s', False))
self.assertTrue(g.edge_in('test:q', 'nova:r', False))
run.update([runtime.Event(helper.str2form('p(x) :- q(x), nova:q(x)'),
target='test')])
run.update([agnostic.Event(helper.str2form('p(x) :- q(x), nova:q(x)'),
target='test')])
self.assertTrue(g.edge_in('test:p', 'nova:q', False))
self.assertTrue(g.edge_in('test:p', 'test:q', False))
self.assertTrue(g.edge_in('test:p', 'test:s', False))
@ -424,7 +424,7 @@ class TestSimulate(base.TestCase):
code = ""
if target is None:
target = self.DEFAULT_THEORY
run = runtime.Runtime()
run = agnostic.Runtime()
run.create_policy(self.DEFAULT_THEORY, abbr='default')
run.create_policy(self.ACTION_THEORY, abbr='action', kind='action')
if theories:
@ -440,12 +440,12 @@ class TestSimulate(base.TestCase):
actth = self.ACTION_THEORY
permitted, errors = run.insert(action_code, target=actth)
self.assertTrue(permitted, "Error in action policy: {}".format(
runtime.iterstr(errors)))
agnostic.iterstr(errors)))
defth = self.DEFAULT_THEORY
permitted, errors = run.insert(class_code, target=defth)
self.assertTrue(permitted, "Error in classifier policy: {}".format(
runtime.iterstr(errors)))
agnostic.iterstr(errors)))
return run

View File

@ -20,7 +20,7 @@ from congress.policy import base
from congress.policy import compile
from congress.policy.compile import Fact
from congress.policy.compile import Literal
from congress.policy import runtime
from congress.policy_engines import agnostic
from congress.tests import base as testbase
from congress.tests import helper
@ -53,49 +53,49 @@ class TestRuntimePerformance(testbase.TestCase):
def setUp(self):
super(TestRuntimePerformance, self).setUp()
self._runtime = runtime.Runtime()
self._runtime.create_policy(NREC_THEORY,
kind=base.NONRECURSIVE_POLICY_TYPE)
self._runtime.create_policy(DB_THEORY, kind=base.DATABASE_POLICY_TYPE)
self._runtime.debug_mode()
self._runtime.insert('', target=NREC_THEORY)
self._agnostic = agnostic.Runtime()
self._agnostic.create_policy(NREC_THEORY,
kind=base.NONRECURSIVE_POLICY_TYPE)
self._agnostic.create_policy(DB_THEORY, kind=base.DATABASE_POLICY_TYPE)
self._agnostic.debug_mode()
self._agnostic.insert('', target=NREC_THEORY)
def _create_event(self, table, tuple_, insert, target):
return runtime.Event(Literal.create_from_table_tuple(table, tuple_),
insert=insert, target=target)
return agnostic.Event(Literal.create_from_table_tuple(table, tuple_),
insert=insert, target=target)
def _create_large_tables(self, n, theory):
facts = [Fact('p', (i, j, k))
for i in range(n) for k in range(n) for j in range(n)]
facts.extend(Fact('q', (i,)) for i in range(n))
self._runtime.initialize_tables(['p', 'q'], facts, theory)
self._agnostic.initialize_tables(['p', 'q'], facts, theory)
def test_insert_nonrecursive(self):
MAX = 100
th = NREC_THEORY
for i in range(MAX):
self._runtime.insert('r(%d)' % i, th)
self._agnostic.insert('r(%d)' % i, th)
def test_insert_database(self):
MAX = 100
th = DB_THEORY
for i in range(MAX):
self._runtime.insert('r(%d)' % i, th)
self._agnostic.insert('r(%d)' % i, th)
def test_update_nonrecursive(self):
MAX = 10000
th = NREC_THEORY
updates = [self._create_event('r', (i,), True, th)
for i in range(MAX)]
self._runtime.update(updates)
self._agnostic.update(updates)
def test_update_database(self):
MAX = 1000
th = DB_THEORY
updates = [self._create_event('r', (i,), True, th)
for i in range(MAX)]
self._runtime.update(updates)
self._agnostic.update(updates)
def test_indexing(self):
MAX = 100
@ -104,14 +104,14 @@ class TestRuntimePerformance(testbase.TestCase):
for table in ('a', 'b', 'c'):
updates = [self._create_event(table, (i,), True, th)
for i in range(MAX)]
self._runtime.update(updates)
self._agnostic.update(updates)
# With indexing, this query should take O(n) time where n is MAX.
# Without indexing, this query will take O(n^3).
self._runtime.insert('d(x) :- a(x), b(x), c(x)', th)
self._agnostic.insert('d(x) :- a(x), b(x), c(x)', th)
ans = ' '.join(['d(%d)' % i for i in range(MAX)])
self.assertTrue(helper.datalog_equal(self._runtime.select('d(x)',
th), ans))
self.assertTrue(helper.datalog_equal(self._agnostic.select('d(x)',
th), ans))
def test_runtime_initialize_tables(self):
MAX = 700
@ -120,7 +120,7 @@ class TestRuntimePerformance(testbase.TestCase):
for i in range(MAX))
th = NREC_THEORY
self._runtime.initialize_tables(['p'], facts, th)
self._agnostic.initialize_tables(['p'], facts, th)
def test_select_1match(self):
# with different types of policies (exercise indexing, large sets,
@ -129,11 +129,11 @@ class TestRuntimePerformance(testbase.TestCase):
th = NREC_THEORY
self._create_large_tables(MAX, th)
self._runtime.insert('r(x,y) :- p(x,x,y), q(x)', th)
self._agnostic.insert('r(x,y) :- p(x,x,y), q(x)', th)
for i in range(100):
# This select returns 1 result
self._runtime.select('r(1, 1)', th)
self._agnostic.select('r(1, 1)', th)
def test_select_100matches(self):
# with different types of policies (exercise indexing, large sets,
@ -142,12 +142,12 @@ class TestRuntimePerformance(testbase.TestCase):
th = NREC_THEORY
self._create_large_tables(MAX, th)
self._runtime.insert('r(x,y) :- p(x,x,y), q(x)', th)
self._agnostic.insert('r(x,y) :- p(x,x,y), q(x)', th)
# This takes about 60ms per select
for i in range(10):
# This select returns 100 results
self._runtime.select('r(x, y)', th)
self._agnostic.select('r(x, y)', th)
def test_simulate_latency(self):
# We think the cost will be the sum of the simulate call + the cost to
@ -159,17 +159,17 @@ class TestRuntimePerformance(testbase.TestCase):
th = NREC_THEORY
self._create_large_tables(MAX, th)
self._runtime.create_policy(ACTION_THEORY,
kind=base.ACTION_POLICY_TYPE)
self._agnostic.create_policy(ACTION_THEORY,
kind=base.ACTION_POLICY_TYPE)
self._runtime.insert('q(0)', th)
self._runtime.insert('s(x) :- q(x), p(x,0,0)', th)
self._agnostic.insert('q(0)', th)
self._agnostic.insert('s(x) :- q(x), p(x,0,0)', th)
# This takes about 13ms per simulate. The query for s(x) can use
# indexing, so it should be efficient.
for _ in range(100):
self._runtime.simulate('s(x)', th, 'p-(0,0,0)',
ACTION_THEORY, delta=True)
self._agnostic.simulate('s(x)', th, 'p-(0,0,0)',
ACTION_THEORY, delta=True)
def test_simulate_throughput(self):
# up to 250 requests per second
@ -235,7 +235,7 @@ class TestDsePerformance(testbase.SqlTestCase):
formula = compile.parse1(
'q(1) :- data:p(1, 2.3, "foo", "bar", 1, %s)' % ('a'*100 + '1'))
self.api['rule'].publish(
'policy-update', [runtime.Event(formula, target=policy)])
'policy-update', [agnostic.Event(formula, target=policy)])
# Poll data and wait til it arrives at engine
driver.poll()
@ -281,7 +281,7 @@ class TestDsePerformance(testbase.SqlTestCase):
'q(1) :- data:p(1, 2.3, "foo", "bar", 1, %s)' % ('a'*100 + '1'))
LOG.info("publishing rule")
self.api['rule'].publish(
'policy-update', [runtime.Event(formula, target=policy)])
'policy-update', [agnostic.Event(formula, target=policy)])
# Poll data and wait til it arrives at engine
driver.poll()

View File

@ -20,7 +20,7 @@ import mox
from congress.dse import dataobj
from congress import harness
from congress.policy import compile
from congress.policy import runtime
from congress.policy_engines import agnostic
from congress.tests import base
from congress.tests import helper
@ -113,7 +113,7 @@ class BenchmarkDatasource(base.Benchmark):
# publish the formula and verify we see a subscription
LOG.debug('%s:: sending formula: %s', self.__class__.__name__, formula)
self.api['rule'].publish('policy-update', [runtime.Event(formula)])
self.api['rule'].publish('policy-update', [agnostic.Event(formula)])
helper.retry_check_subscriptions(
self.engine, [('benchmark', table_name)])
helper.retry_check_subscribers(

View File

@ -31,7 +31,7 @@ from congress.common import config
from congress import harness
from congress.openstack.common import log as logging
from congress.policy import compile
from congress.policy import runtime
from congress.policy_engines import agnostic
from congress.tests import base
import congress.tests.datasources.test_neutron_driver as test_neutron
from congress.tests import helper
@ -170,7 +170,7 @@ class TestCongress(base.SqlTestCase):
formula = test_neutron.create_network_group('p')
LOG.debug("Sending formula: %s", formula)
api['rule'].publish(
'policy-update', [runtime.Event(formula, target=policy)])
'policy-update', [agnostic.Event(formula, target=policy)])
# check we have the proper subscriptions
self.assertTrue('neutron' in cage.services)
neutron = cage.service_object('neutron')
@ -188,7 +188,7 @@ class TestCongress(base.SqlTestCase):
formula = test_neutron.create_network_group('p')
LOG.debug("Sending formula: %s", formula)
api['rule'].publish(
'policy-update', [runtime.Event(formula, target=policy)])
'policy-update', [agnostic.Event(formula, target=policy)])
helper.retry_check_nonempty_last_policy_change(engine)
LOG.debug("All services: %s", cage.services.keys())
neutron = cage.service_object('neutron')
@ -206,7 +206,7 @@ class TestCongress(base.SqlTestCase):
# Send formula
formula = test_neutron.create_networkXnetwork_group('p')
api['rule'].publish(
'policy-update', [runtime.Event(formula, target=policy)])
'policy-update', [agnostic.Event(formula, target=policy)])
helper.retry_check_nonempty_last_policy_change(engine)
# poll datasources
neutron = cage.service_object('neutron')