Lock table and sync rule before each rule insert

In replicated PE mode, lock table and sync rule before each rule
insert to prevent conflicting rule from being inserted from another
policy engine.

Supported backends are MySQL and PostgreSQL
Manually tested with both backends.

TODO: add tempest scenarios to test recursion blocked
https://bugs.launchpad.net/congress/+bug/1655787

Closes-Bug: 1638746
Closes-Bug: 1637185

Change-Id: I29c6f63b467905c26fe1d35348a3b7a70d8414b6
This commit is contained in:
Eric K 2017-01-04 22:23:42 -08:00
parent 507b81f15e
commit 52efca0ae8
7 changed files with 186 additions and 66 deletions

View File

@ -73,6 +73,9 @@ core_opts = [
cfg.BoolOpt('enable_execute_action', default=True,
help='Set the flag to False if you don\'t want Congress '
'to execute actions.'),
cfg.BoolOpt('replicated_policy_engine', default=False,
help='Set the flag to use congress with replicated policy '
'engines.'),
cfg.BoolOpt('distributed_architecture',
deprecated_for_removal=True,
deprecated_reason='distributed architecture is now the only '

View File

@ -46,3 +46,71 @@ def get_session(autocommit=True, expire_on_commit=False, make_new=False):
facade = _create_facade_lazily()
return facade.get_session(autocommit=autocommit,
expire_on_commit=expire_on_commit)
def get_locking_session():
"""Obtain db_session that works with table locking
supported backends: MySQL and PostgreSQL
return default session if backend not supported (eg. sqlite)
"""
if is_mysql() or is_postgres():
db_session = get_session(
autocommit=False,
# to prevent implicit new transactions,
# which UNLOCKS in MySQL
expire_on_commit=False, # need to UNLOCK after commit
make_new=True) # brand new facade avoids interference
else: # unsupported backend for locking (eg sqlite), return default
db_session = get_session()
return db_session
def lock_tables(session, tables):
"""Write-lock tables for supported backends: MySQL and PostgreSQL"""
session.begin(subtransactions=True)
if is_mysql(): # Explicitly LOCK TABLES for MySQL
session.execute('SET autocommit=0')
for table in tables:
session.execute('LOCK TABLES {} WRITE'.format(table))
elif is_postgres(): # Explicitly LOCK TABLE for Postgres
session.execute('BEGIN TRANSACTION')
for table in tables:
session.execute('LOCK TABLE {} IN EXCLUSIVE MODE'.format(table))
def commit_unlock_tables(session):
"""Commit and unlock tables for supported backends: MySQL and PostgreSQL"""
session.commit()
session.execute('COMMIT') # execute COMMIT on DB backend
# because sqlalchemy session does not guarantee
# exact boundary correspondence to DB backend transactions
# We must guarantee DB commits transaction before UNLOCK
# unlock
if is_mysql():
session.execute('UNLOCK TABLES')
# postgres automatically releases lock at transaction
def is_mysql():
"""Return true if and only if database backend is mysql"""
return (cfg.CONF.database.connection is not None and
(cfg.CONF.database.connection.split(':/')[0] == 'mysql' or
cfg.CONF.database.connection.split('+')[0] == 'mysql'))
def is_postgres():
"""Return true if and only if database backend is postgres"""
return (cfg.CONF.database.connection is not None and
(cfg.CONF.database.connection.split(':/')[0] == 'postgresql' or
cfg.CONF.database.connection.split('+')[0] == 'postgresql'))
def is_sqlite():
"""Return true if and only if database backend is sqlite"""
return (cfg.CONF.database.connection is not None and
(cfg.CONF.database.connection.split(':/')[0] == 'sqlite' or
cfg.CONF.database.connection.split('+')[0] == 'sqlite'))

View File

@ -20,6 +20,7 @@ from __future__ import absolute_import
import time
import eventlet
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_messaging import exceptions as messaging_exceptions
@ -34,6 +35,7 @@ from congress.datalog import materialized
from congress.datalog import nonrecursive
from congress.datalog import unify
from congress.datalog import utility
from congress.db import api as db_api
from congress.db import db_policy_rules
from congress.dse2 import data_service
from congress import exception
@ -330,78 +332,107 @@ class Runtime (object):
return [rule.to_dict() for rule in rules]
# Note(thread-safety): blocking function
# acquire lock to avoid periodic sync from undoing insert before persisted
@lockutils.synchronized('congress_synchronize_rules')
def persistent_insert_rule(self, policy_name, str_rule, rule_name,
comment):
"""Insert and persists rule into policy_name."""
# Reject rules inserted into non-persisted policies
# (i.e. datasource policies)
# Note(thread-safety): blocking call
policy_name = db_policy_rules.policy_name(policy_name)
# call synchronizer to make sure policy is synchronized in memory
self.synchronizer.sync_one_policy(policy_name)
# Note(thread-safety): blocking call
policies = db_policy_rules.get_policies()
persisted_policies = set([p.name for p in policies])
if policy_name not in persisted_policies:
if policy_name in self.theory:
LOG.debug(
"insert_persisted_rule error: rule not permitted for "
"policy %s", policy_name)
raise exception.PolicyRuntimeException(
name='rule_not_permitted')
id_ = uuidutils.generate_uuid()
try:
rule = self.parse(str_rule)
except exception.PolicyException as e:
# TODO(thinrichs): change compiler to provide these error_code
# names directly.
raise exception.PolicyException(str(e), name='rule_syntax')
if cfg.CONF.replicated_policy_engine:
# get session
db_session = db_api.get_locking_session()
db_session.begin(subtransactions=True)
if len(rule) == 1:
rule = rule[0]
else:
msg = ("Received multiple rules: " +
"; ".join(str(x) for x in rule))
raise exception.PolicyRuntimeException(msg, name='multiple_rules')
# lock policy_rules table to prevent conflicting rules
# insertion (say causing unsupported recursion)
db_api.lock_tables(session=db_session, tables=['policy_rules'])
rule.set_id(id_)
rule.set_name(rule_name)
rule.set_comment(comment or "")
rule.set_original_str(str_rule)
changes = self._safe_process_policy_update(
rule, policy_name, persistent=True)
# save rule to database if change actually happened.
# Note: change produced may not be equivalent to original rule because
# of column-reference elimination.
if len(changes) > 0:
d = {'rule': rule.pretty_str(),
'id': str(rule.id),
'comment': rule.comment,
'name': rule.name}
# synchronize policy rules to get latest state, locked state
# non-locking version because lock already acquired,
# avoid deadlock
self.synchronizer.synchronize_rules_nonlocking(
db_session=db_session)
else:
db_session = None
# Reject rules inserted into non-persisted policies
# (i.e. datasource policies)
# Note(thread-safety): blocking call
policy_name = db_policy_rules.policy_name(
policy_name)
# call synchronizer to make sure policy is synchronized in memory
self.synchronizer.sync_one_policy(policy_name)
# Note(thread-safety): blocking call
policies = db_policy_rules.get_policies()
persisted_policies = set([p.name for p in policies])
if policy_name not in persisted_policies:
if policy_name in self.theory:
LOG.debug(
"insert_persisted_rule error: rule not permitted for "
"policy %s", policy_name)
raise exception.PolicyRuntimeException(
name='rule_not_permitted')
id_ = uuidutils.generate_uuid()
try:
# Note(thread-safety): blocking call
db_policy_rules.add_policy_rule(
d['id'], policy_name, str_rule, d['comment'],
rule_name=d['name'])
return (d['id'], d)
except Exception as db_exception:
try:
self._safe_process_policy_update(
rule, policy_name, insert=False)
raise exception.PolicyRuntimeException(
"Error while writing to DB: %s."
% str(db_exception))
except Exception as change_exception:
raise exception.PolicyRuntimeException(
"Error thrown during recovery from DB error. "
"Inconsistent state. DB error: %s. "
"New error: %s." % (str(db_exception),
str(change_exception)))
rule = self.parse(str_rule)
except exception.PolicyException as e:
# TODO(thinrichs): change compiler to provide these error_code
# names directly.
raise exception.PolicyException(str(e), name='rule_syntax')
# change not accepted means it was already there
raise exception.PolicyRuntimeException(
name='rule_already_exists')
if len(rule) == 1:
rule = rule[0]
else:
msg = ("Received multiple rules: " +
"; ".join(str(x) for x in rule))
raise exception.PolicyRuntimeException(
msg, name='multiple_rules')
rule.set_id(id_)
rule.set_name(rule_name)
rule.set_comment(comment or "")
rule.set_original_str(str_rule)
changes = self._safe_process_policy_update(
rule, policy_name, persistent=True)
# save rule to database if change actually happened.
# Note: change produced may not be equivalent to original rule
# because of column-reference elimination.
if len(changes) > 0:
d = {'rule': rule.pretty_str(),
'id': str(rule.id),
'comment': rule.comment,
'name': rule.name}
try:
# Note(thread-safety): blocking call
db_policy_rules.add_policy_rule(
d['id'], policy_name, str_rule, d['comment'],
rule_name=d['name'], session=db_session)
# do not begin to avoid implicitly releasing table
# lock due to starting new transaction
return (d['id'], d)
except Exception as db_exception:
try:
self._safe_process_policy_update(
rule, policy_name, insert=False)
raise exception.PolicyRuntimeException(
"Error while writing to DB: %s."
% str(db_exception))
except Exception as change_exception:
raise exception.PolicyRuntimeException(
"Error thrown during recovery from DB error. "
"Inconsistent state. DB error: %s. "
"New error: %s." % (str(db_exception),
str(change_exception)))
# change not accepted means it was already there
raise exception.PolicyRuntimeException(
name='rule_already_exists')
finally:
# commit, unlock, and close db_session
if db_session:
db_api.commit_unlock_tables(session=db_session)
db_session.close()
# Note(thread-safety): blocking function
def persistent_delete_rule(self, id_, policy_name_or_id):

View File

@ -28,6 +28,7 @@ from oslo_log import log as logging
from oslo_service import service
from congress.common import config
from congress.db import api as db_api
# FIXME It has to initialize distributed_architecture flag basing on the
# config file before the python interpreter imports python file which has
# if-statement for deepsix. Since the default value of the flag is False
@ -144,6 +145,12 @@ def main():
sys.exit("ERROR: Unable to find configuration file via default "
"search paths ~/.congress/, ~/, /etc/congress/, /etc/) and "
"the '--config-file' option!")
if cfg.CONF.replicated_policy_engine and not (
db_api.is_mysql() or db_api.is_postgres()):
sys.exit("ERROR: replicated_policy_engine option can be used only with"
" MySQL or PostgreSQL database backends. Please set the "
"connection option in [database] section of congress.conf "
"to use a supported backend.")
config.setup_logging()
if not (cfg.CONF.api or cfg.CONF.policy_engine or cfg.CONF.datasources):

View File

@ -186,13 +186,16 @@ class PolicyRuleSynchronizer(object):
@periodics.periodic(spacing=cfg.CONF.datasource_sync_period)
@lockutils.synchronized('congress_synchronize_rules')
def synchronize_rules(self):
def synchronize_rules(self, db_session=None):
self.synchronize_rules_nonlocking(db_session=db_session)
def synchronize_rules_nonlocking(self, db_session=None):
LOG.debug("Synchronizing rules on node %s", self.node.node_id)
try:
# Read rules from DB.
configured_rules = []
configured_facts = []
for r in db_policy_rules.get_policy_rules():
for r in db_policy_rules.get_policy_rules(session=db_session):
if ':-' in r.rule: # if rule has body
configured_rules.append({'rule': r.rule,
'id': r.id,

View File

@ -57,6 +57,7 @@ function configure_congress {
iniset $CONGRESS_CONF oslo_policy policy_file $CONGRESS_POLICY_FILE
iniset $CONGRESS_CONF DEFAULT auth_strategy $CONGRESS_AUTH_STRATEGY
iniset $CONGRESS_CONF DEFAULT datasource_sync_period 30
iniset $CONGRESS_CONF DEFAULT replicated_policy_engine "$CONGRESS_REPLICATED"
# if [ "$CONGRESS_MULTIPROCESS_DEPLOYMENT" == "False" ]; then
# iniset $CONGRESS_CONF DEFAULT transport_url $CONGRESS_TRANSPORT_URL

View File

@ -66,6 +66,13 @@ For example:
[DEFAULT]
transport_url = rabbit://<rabbit-userid>:<rabbit-password>@<rabbit-host-address>:5672
In addition, the ``replicated_policy_engine`` option should be set to ``True``.
.. code-block:: text
[DEFAULT]
replicated_policy_engine = True
All hosts should be configured with a database connection that points to the
shared database deployed in step 1, not the local address shown in
`separate install instructions`__.