Merge "Make persistent_insert_rule error on IncompleteSchemaException"

This commit is contained in:
Jenkins 2016-03-21 18:15:59 +00:00 committed by Gerrit Code Review
commit 5e2c7cde59
3 changed files with 80 additions and 20 deletions

View File

@ -819,13 +819,12 @@ class Literal (object):
if theory.kind != base.DATASOURCE_POLICY_TYPE: # eventually remove
raise exception.PolicyException(
"Literal {} uses column references, but '{}' does not "
"reference a datasource policy".format(self, theory.name))
"reference a datasource policy.".format(self, theory.name))
schema = theory.schema
if self.table.table not in schema:
raise exception.IncompleteSchemaException(
"Literal {} uses unknown table {} "
"from schema {}".format(
str(self), str(self.table.table), str(schema)))
"Literal {} uses unknown table {}.".format(
str(self), str(self.table.table)))
# check if named arguments conflict with positional or named arguments
errors = []

View File

@ -360,7 +360,8 @@ class Runtime (object):
rule.set_name(rule_name)
rule.set_comment(comment or "")
rule.set_original_str(str_rule)
changes = self._safe_process_policy_update(rule, policy_name)
changes = self._safe_process_policy_update(
rule, policy_name, persistent=True)
# save rule to database if change actually happened.
# Note: change produced may not be equivalent to original rule because
# of column-reference elimination.
@ -378,8 +379,11 @@ class Runtime (object):
try:
self._safe_process_policy_update(
rule, policy_name, insert=False)
raise exception.PolicyRuntimeException(
"Error while writing to DB: %s."
% str(db_exception))
except Exception as change_exception:
raise exception.PolicyException(
raise exception.PolicyRuntimeException(
"Error thrown during recovery from DB error. "
"Inconsistent state. DB error: %s. "
"New error: %s." % (str(db_exception),
@ -422,7 +426,7 @@ class Runtime (object):
rule.policy_name)
def _safe_process_policy_update(self, parsed_rule, policy_name,
insert=True):
insert=True, persistent=False):
if policy_name not in self.theory:
raise exception.PolicyRuntimeException(
'Policy ID %s does not exist' % policy_name,
@ -431,7 +435,8 @@ class Runtime (object):
formula=parsed_rule,
insert=insert,
target=policy_name)
(permitted, changes) = self.process_policy_update([event])
(permitted, changes) = self.process_policy_update(
[event], persistent=persistent)
if not permitted:
raise exception.PolicyException(
";".join([str(x) for x in changes]),
@ -685,15 +690,15 @@ class Runtime (object):
else:
return self._delete_obj(formula, target)
def update(self, sequence, target=None):
def update(self, sequence, target=None, persistent=False):
"""Event handler for applying an arbitrary sequence of insert/deletes.
If TARGET is supplied, it overrides the targets in SEQUENCE.
"""
if isinstance(sequence, six.string_types):
return self._update_string(sequence, target)
return self._update_string(sequence, target, persistent)
else:
return self._update_obj(sequence, target)
return self._update_obj(sequence, target, persistent)
def policy(self, target=None):
"""Event handler for querying policy."""
@ -930,10 +935,10 @@ class Runtime (object):
theory_string)
# update
def _update_string(self, events_string, theory_string):
def _update_string(self, events_string, theory_string, persistent=False):
assert False, "Not yet implemented--need parser to read events"
def _update_obj(self, events, theory_string):
def _update_obj(self, events, theory_string, persistent=False):
"""Apply events.
Checks if applying EVENTS is permitted and if not
@ -955,7 +960,8 @@ class Runtime (object):
if len(errors) > 0:
return (False, errors)
# eliminate column refs where possible
enabled, disabled, errs = self._process_limbo_events(events)
enabled, disabled, errs = self._process_limbo_events(
events, persistent)
for err in errs:
errors.extend(err[1])
if len(errors) > 0:
@ -974,7 +980,7 @@ class Runtime (object):
"""
self.disabled_events.extend(events)
def _process_limbo_events(self, events):
def _process_limbo_events(self, events, persistent=False):
"""Assume that events.theory is an object.
Return (<enabled>, <disabled>, <errors>)
@ -996,8 +1002,14 @@ class Runtime (object):
# doesn't copy over ID since it creates a new one
event.formula.set_id(oldformula.id)
enabled.append(event)
except exception.IncompleteSchemaException:
disabled.append(event)
except exception.IncompleteSchemaException as e:
if persistent:
# FIXME(ekcs): inconsistent behavior?
# persistent_insert with 'unknown:p(x)' allowed but
# 'unknown:p(colname=x)' disallowed
raise exception.PolicyException(str(e), name='rule_syntax')
else:
disabled.append(event)
except exception.PolicyException as e:
errors.append((event, [e]))
return enabled, disabled, errors
@ -1770,11 +1782,11 @@ class DseRuntime (Runtime, deepsix.deepSix):
# update the policy and subscriptions to data tables.
self.last_policy_change = self.process_policy_update(msg.body.data)
def process_policy_update(self, events):
def process_policy_update(self, events, persistent=False):
self.log("process_policy_update %s" % events)
# body_only so that we don't subscribe to tables in the head
oldtables = self.tablenames(body_only=True)
result = self.update(events)
result = self.update(events, persistent=persistent)
newtables = self.tablenames(body_only=True)
self.update_table_subscriptions(oldtables, newtables)
return result

View File

@ -32,6 +32,8 @@ from congress.policy_engines import agnostic
from congress.tests import base
from congress.tests import helper
import congress.dse.d6cage
LOG = logging.getLogger(__name__)
NREC_THEORY = 'non-recursive theory'
@ -1586,7 +1588,7 @@ class TestActionExecution(base.TestCase):
self.assertFalse(run.request.called)
class TestDisabledRules(base.TestCase):
class TestDisabledRules(base.SqlTestCase):
"""Tests for Runtime's ability to enable/disable rules."""
# insertions
def test_insert_enabled(self):
@ -1608,6 +1610,53 @@ class TestDisabledRules(base.TestCase):
self.assertEqual(len(run.disabled_events), 1)
self.assertEqual(len(obj.content()), 0)
def test_persistent_insert_disabled(self):
"""Test that persistent_insert_rule errors on IncompleteSchemaException
When a table schema is not available, named column references are
permitted but disabled in non-persistent rule insert to allow for
late-arriving schema when importing rules already in DB.
This behavior is not necessary in persistent_insert.
"""
# FIXME(ekcs): test at Runtime level rather than DseRuntime level.
# temporarily test on DseRuntime because the persistence layer of
# Runtime depends on DseRuntime
cage = congress.dse.d6cage.d6Cage()
cage.loadModule("TestPolicy", helper.policy_module_path())
cage.createservice(name="policy", moduleName="TestPolicy",
args={'d6cage': cage, 'rootdir': '',
'log_actions_only': True})
run = cage.services['policy']['object']
run.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
run.persistent_create_policy('policy')
obj = run.policy_object('policy')
run.insert('p(x) :- data:q(id=x)')
try:
run.persistent_insert_rule('policy', 'p(x) :- data:q(id=x)',
'', '')
except exception.PolicyException as e:
self.assertTrue(
'Literal data:q(id=x) uses unknown table q'
in str(e),
'Named column reference on unknown table '
'should be disallowed in persistent insert')
self.assertEqual(len(run.disabled_events), 0)
self.assertEqual(len(obj.content()), 0)
try:
run.persistent_insert_rule('policy', 'p(x) :- unknown:q(id=x)',
'', '')
except exception.PolicyException as e:
self.assertTrue(
'Literal unknown:q(id=x) uses named arguments, but the '
'schema is unknown.'
in str(e),
'Named column reference on unknown table '
'should be disallowed in persistent insert')
self.assertEqual(len(run.disabled_events), 0)
self.assertEqual(len(obj.content()), 0)
def test_insert_errors(self):
run = agnostic.Runtime()
run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)