Merge "Lock table and sync rule before each rule insert"

This commit is contained in:
Jenkins 2017-01-30 07:44:04 +00:00 committed by Gerrit Code Review
commit 5f6e9ce316
7 changed files with 186 additions and 66 deletions

View File

@ -72,6 +72,9 @@ core_opts = [
cfg.BoolOpt('enable_execute_action', default=True,
help='Set the flag to False if you don\'t want Congress '
'to execute actions.'),
cfg.BoolOpt('replicated_policy_engine', default=False,
help='Set the flag to use congress with replicated policy '
'engines.'),
cfg.BoolOpt('distributed_architecture',
deprecated_for_removal=True,
deprecated_reason='distributed architecture is now the only '

View File

@ -46,3 +46,71 @@ def get_session(autocommit=True, expire_on_commit=False, make_new=False):
facade = _create_facade_lazily()
return facade.get_session(autocommit=autocommit,
expire_on_commit=expire_on_commit)
def get_locking_session():
"""Obtain db_session that works with table locking
supported backends: MySQL and PostgreSQL
return default session if backend not supported (eg. sqlite)
"""
if is_mysql() or is_postgres():
db_session = get_session(
autocommit=False,
# to prevent implicit new transactions,
# which UNLOCKS in MySQL
expire_on_commit=False, # need to UNLOCK after commit
make_new=True) # brand new facade avoids interference
else: # unsupported backend for locking (eg sqlite), return default
db_session = get_session()
return db_session
def lock_tables(session, tables):
"""Write-lock tables for supported backends: MySQL and PostgreSQL"""
session.begin(subtransactions=True)
if is_mysql(): # Explicitly LOCK TABLES for MySQL
session.execute('SET autocommit=0')
for table in tables:
session.execute('LOCK TABLES {} WRITE'.format(table))
elif is_postgres(): # Explicitly LOCK TABLE for Postgres
session.execute('BEGIN TRANSACTION')
for table in tables:
session.execute('LOCK TABLE {} IN EXCLUSIVE MODE'.format(table))
def commit_unlock_tables(session):
"""Commit and unlock tables for supported backends: MySQL and PostgreSQL"""
session.commit()
session.execute('COMMIT') # execute COMMIT on DB backend
# because sqlalchemy session does not guarantee
# exact boundary correspondence to DB backend transactions
# We must guarantee DB commits transaction before UNLOCK
# unlock
if is_mysql():
session.execute('UNLOCK TABLES')
# postgres automatically releases lock at transaction
def is_mysql():
"""Return true if and only if database backend is mysql"""
return (cfg.CONF.database.connection is not None and
(cfg.CONF.database.connection.split(':/')[0] == 'mysql' or
cfg.CONF.database.connection.split('+')[0] == 'mysql'))
def is_postgres():
"""Return true if and only if database backend is postgres"""
return (cfg.CONF.database.connection is not None and
(cfg.CONF.database.connection.split(':/')[0] == 'postgresql' or
cfg.CONF.database.connection.split('+')[0] == 'postgresql'))
def is_sqlite():
"""Return true if and only if database backend is sqlite"""
return (cfg.CONF.database.connection is not None and
(cfg.CONF.database.connection.split(':/')[0] == 'sqlite' or
cfg.CONF.database.connection.split('+')[0] == 'sqlite'))

View File

@ -20,6 +20,7 @@ from __future__ import absolute_import
import time
import eventlet
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_messaging import exceptions as messaging_exceptions
@ -34,6 +35,7 @@ from congress.datalog import materialized
from congress.datalog import nonrecursive
from congress.datalog import unify
from congress.datalog import utility
from congress.db import api as db_api
from congress.db import db_policy_rules
from congress.dse2 import data_service
from congress import exception
@ -330,78 +332,107 @@ class Runtime (object):
return [rule.to_dict() for rule in rules]
# Note(thread-safety): blocking function
# acquire lock to avoid periodic sync from undoing insert before persisted
@lockutils.synchronized('congress_synchronize_rules')
def persistent_insert_rule(self, policy_name, str_rule, rule_name,
comment):
"""Insert and persists rule into policy_name."""
# Reject rules inserted into non-persisted policies
# (i.e. datasource policies)
# Note(thread-safety): blocking call
policy_name = db_policy_rules.policy_name(policy_name)
# call synchronizer to make sure policy is synchronized in memory
self.synchronizer.sync_one_policy(policy_name)
# Note(thread-safety): blocking call
policies = db_policy_rules.get_policies()
persisted_policies = set([p.name for p in policies])
if policy_name not in persisted_policies:
if policy_name in self.theory:
LOG.debug(
"insert_persisted_rule error: rule not permitted for "
"policy %s", policy_name)
raise exception.PolicyRuntimeException(
name='rule_not_permitted')
id_ = uuidutils.generate_uuid()
try:
rule = self.parse(str_rule)
except exception.PolicyException as e:
# TODO(thinrichs): change compiler to provide these error_code
# names directly.
raise exception.PolicyException(str(e), name='rule_syntax')
if cfg.CONF.replicated_policy_engine:
# get session
db_session = db_api.get_locking_session()
db_session.begin(subtransactions=True)
if len(rule) == 1:
rule = rule[0]
else:
msg = ("Received multiple rules: " +
"; ".join(str(x) for x in rule))
raise exception.PolicyRuntimeException(msg, name='multiple_rules')
# lock policy_rules table to prevent conflicting rules
# insertion (say causing unsupported recursion)
db_api.lock_tables(session=db_session, tables=['policy_rules'])
rule.set_id(id_)
rule.set_name(rule_name)
rule.set_comment(comment or "")
rule.set_original_str(str_rule)
changes = self._safe_process_policy_update(
rule, policy_name, persistent=True)
# save rule to database if change actually happened.
# Note: change produced may not be equivalent to original rule because
# of column-reference elimination.
if len(changes) > 0:
d = {'rule': rule.pretty_str(),
'id': str(rule.id),
'comment': rule.comment,
'name': rule.name}
# synchronize policy rules to get latest state, locked state
# non-locking version because lock already acquired,
# avoid deadlock
self.synchronizer.synchronize_rules_nonlocking(
db_session=db_session)
else:
db_session = None
# Reject rules inserted into non-persisted policies
# (i.e. datasource policies)
# Note(thread-safety): blocking call
policy_name = db_policy_rules.policy_name(
policy_name)
# call synchronizer to make sure policy is synchronized in memory
self.synchronizer.sync_one_policy(policy_name)
# Note(thread-safety): blocking call
policies = db_policy_rules.get_policies()
persisted_policies = set([p.name for p in policies])
if policy_name not in persisted_policies:
if policy_name in self.theory:
LOG.debug(
"insert_persisted_rule error: rule not permitted for "
"policy %s", policy_name)
raise exception.PolicyRuntimeException(
name='rule_not_permitted')
id_ = uuidutils.generate_uuid()
try:
# Note(thread-safety): blocking call
db_policy_rules.add_policy_rule(
d['id'], policy_name, str_rule, d['comment'],
rule_name=d['name'])
return (d['id'], d)
except Exception as db_exception:
try:
self._safe_process_policy_update(
rule, policy_name, insert=False)
raise exception.PolicyRuntimeException(
"Error while writing to DB: %s."
% str(db_exception))
except Exception as change_exception:
raise exception.PolicyRuntimeException(
"Error thrown during recovery from DB error. "
"Inconsistent state. DB error: %s. "
"New error: %s." % (str(db_exception),
str(change_exception)))
rule = self.parse(str_rule)
except exception.PolicyException as e:
# TODO(thinrichs): change compiler to provide these error_code
# names directly.
raise exception.PolicyException(str(e), name='rule_syntax')
# change not accepted means it was already there
raise exception.PolicyRuntimeException(
name='rule_already_exists')
if len(rule) == 1:
rule = rule[0]
else:
msg = ("Received multiple rules: " +
"; ".join(str(x) for x in rule))
raise exception.PolicyRuntimeException(
msg, name='multiple_rules')
rule.set_id(id_)
rule.set_name(rule_name)
rule.set_comment(comment or "")
rule.set_original_str(str_rule)
changes = self._safe_process_policy_update(
rule, policy_name, persistent=True)
# save rule to database if change actually happened.
# Note: change produced may not be equivalent to original rule
# because of column-reference elimination.
if len(changes) > 0:
d = {'rule': rule.pretty_str(),
'id': str(rule.id),
'comment': rule.comment,
'name': rule.name}
try:
# Note(thread-safety): blocking call
db_policy_rules.add_policy_rule(
d['id'], policy_name, str_rule, d['comment'],
rule_name=d['name'], session=db_session)
# do not begin to avoid implicitly releasing table
# lock due to starting new transaction
return (d['id'], d)
except Exception as db_exception:
try:
self._safe_process_policy_update(
rule, policy_name, insert=False)
raise exception.PolicyRuntimeException(
"Error while writing to DB: %s."
% str(db_exception))
except Exception as change_exception:
raise exception.PolicyRuntimeException(
"Error thrown during recovery from DB error. "
"Inconsistent state. DB error: %s. "
"New error: %s." % (str(db_exception),
str(change_exception)))
# change not accepted means it was already there
raise exception.PolicyRuntimeException(
name='rule_already_exists')
finally:
# commit, unlock, and close db_session
if db_session:
db_api.commit_unlock_tables(session=db_session)
db_session.close()
# Note(thread-safety): blocking function
def persistent_delete_rule(self, id_, policy_name_or_id):

View File

@ -28,6 +28,7 @@ from oslo_log import log as logging
from oslo_service import service
from congress.common import config
from congress.db import api as db_api
# FIXME It has to initialize distributed_architecture flag basing on the
# config file before the python interpreter imports python file which has
# if-statement for deepsix. Since the default value of the flag is False
@ -144,6 +145,12 @@ def main():
sys.exit("ERROR: Unable to find configuration file via default "
"search paths ~/.congress/, ~/, /etc/congress/, /etc/) and "
"the '--config-file' option!")
if cfg.CONF.replicated_policy_engine and not (
db_api.is_mysql() or db_api.is_postgres()):
sys.exit("ERROR: replicated_policy_engine option can be used only with"
" MySQL or PostgreSQL database backends. Please set the "
"connection option in [database] section of congress.conf "
"to use a supported backend.")
config.setup_logging()
if not (cfg.CONF.api or cfg.CONF.policy_engine or cfg.CONF.datasources):

View File

@ -186,13 +186,16 @@ class PolicyRuleSynchronizer(object):
@periodics.periodic(spacing=cfg.CONF.datasource_sync_period)
@lockutils.synchronized('congress_synchronize_rules')
def synchronize_rules(self):
def synchronize_rules(self, db_session=None):
self.synchronize_rules_nonlocking(db_session=db_session)
def synchronize_rules_nonlocking(self, db_session=None):
LOG.debug("Synchronizing rules on node %s", self.node.node_id)
try:
# Read rules from DB.
configured_rules = []
configured_facts = []
for r in db_policy_rules.get_policy_rules():
for r in db_policy_rules.get_policy_rules(session=db_session):
if ':-' in r.rule: # if rule has body
configured_rules.append({'rule': r.rule,
'id': r.id,

View File

@ -57,6 +57,7 @@ function configure_congress {
iniset $CONGRESS_CONF oslo_policy policy_file $CONGRESS_POLICY_FILE
iniset $CONGRESS_CONF DEFAULT auth_strategy $CONGRESS_AUTH_STRATEGY
iniset $CONGRESS_CONF DEFAULT datasource_sync_period 30
iniset $CONGRESS_CONF DEFAULT replicated_policy_engine "$CONGRESS_REPLICATED"
# if [ "$CONGRESS_MULTIPROCESS_DEPLOYMENT" == "False" ]; then
# iniset $CONGRESS_CONF DEFAULT transport_url $CONGRESS_TRANSPORT_URL

View File

@ -66,6 +66,13 @@ For example:
[DEFAULT]
transport_url = rabbit://<rabbit-userid>:<rabbit-password>@<rabbit-host-address>:5672
In addition, the ``replicated_policy_engine`` option should be set to ``True``.
.. code-block:: text
[DEFAULT]
replicated_policy_engine = True
All hosts should be configured with a database connection that points to the
shared database deployed in step 1, not the local address shown in
`separate install instructions`__.