diff --git a/congress/common/config.py b/congress/common/config.py index b08d7ea59..b52f8404c 100644 --- a/congress/common/config.py +++ b/congress/common/config.py @@ -72,6 +72,9 @@ core_opts = [ cfg.BoolOpt('enable_execute_action', default=True, help='Set the flag to False if you don\'t want Congress ' 'to execute actions.'), + cfg.BoolOpt('replicated_policy_engine', default=False, + help='Set the flag to use congress with replicated policy ' + 'engines.'), cfg.BoolOpt('distributed_architecture', deprecated_for_removal=True, deprecated_reason='distributed architecture is now the only ' diff --git a/congress/db/api.py b/congress/db/api.py index ced7054b5..1f26c1f40 100644 --- a/congress/db/api.py +++ b/congress/db/api.py @@ -46,3 +46,71 @@ def get_session(autocommit=True, expire_on_commit=False, make_new=False): facade = _create_facade_lazily() return facade.get_session(autocommit=autocommit, expire_on_commit=expire_on_commit) + + +def get_locking_session(): + """Obtain db_session that works with table locking + + supported backends: MySQL and PostgreSQL + return default session if backend not supported (eg. sqlite) + """ + if is_mysql() or is_postgres(): + db_session = get_session( + autocommit=False, + # to prevent implicit new transactions, + # which UNLOCKS in MySQL + expire_on_commit=False, # need to UNLOCK after commit + make_new=True) # brand new facade avoids interference + + else: # unsupported backend for locking (eg sqlite), return default + db_session = get_session() + + return db_session + + +def lock_tables(session, tables): + """Write-lock tables for supported backends: MySQL and PostgreSQL""" + session.begin(subtransactions=True) + if is_mysql(): # Explicitly LOCK TABLES for MySQL + session.execute('SET autocommit=0') + for table in tables: + session.execute('LOCK TABLES {} WRITE'.format(table)) + elif is_postgres(): # Explicitly LOCK TABLE for Postgres + session.execute('BEGIN TRANSACTION') + for table in tables: + session.execute('LOCK TABLE {} IN EXCLUSIVE MODE'.format(table)) + + +def commit_unlock_tables(session): + """Commit and unlock tables for supported backends: MySQL and PostgreSQL""" + session.commit() + session.execute('COMMIT') # execute COMMIT on DB backend + # because sqlalchemy session does not guarantee + # exact boundary correspondence to DB backend transactions + # We must guarantee DB commits transaction before UNLOCK + + # unlock + if is_mysql(): + session.execute('UNLOCK TABLES') + # postgres automatically releases lock at transaction + + +def is_mysql(): + """Return true if and only if database backend is mysql""" + return (cfg.CONF.database.connection is not None and + (cfg.CONF.database.connection.split(':/')[0] == 'mysql' or + cfg.CONF.database.connection.split('+')[0] == 'mysql')) + + +def is_postgres(): + """Return true if and only if database backend is postgres""" + return (cfg.CONF.database.connection is not None and + (cfg.CONF.database.connection.split(':/')[0] == 'postgresql' or + cfg.CONF.database.connection.split('+')[0] == 'postgresql')) + + +def is_sqlite(): + """Return true if and only if database backend is sqlite""" + return (cfg.CONF.database.connection is not None and + (cfg.CONF.database.connection.split(':/')[0] == 'sqlite' or + cfg.CONF.database.connection.split('+')[0] == 'sqlite')) diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py index cd53893e7..0e11401ca 100644 --- a/congress/policy_engines/agnostic.py +++ b/congress/policy_engines/agnostic.py @@ -20,6 +20,7 @@ from __future__ import absolute_import import time import eventlet +from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging from oslo_messaging import exceptions as messaging_exceptions @@ -34,6 +35,7 @@ from congress.datalog import materialized from congress.datalog import nonrecursive from congress.datalog import unify from congress.datalog import utility +from congress.db import api as db_api from congress.db import db_policy_rules from congress.dse2 import data_service from congress import exception @@ -330,78 +332,107 @@ class Runtime (object): return [rule.to_dict() for rule in rules] # Note(thread-safety): blocking function + # acquire lock to avoid periodic sync from undoing insert before persisted + @lockutils.synchronized('congress_synchronize_rules') def persistent_insert_rule(self, policy_name, str_rule, rule_name, comment): """Insert and persists rule into policy_name.""" - # Reject rules inserted into non-persisted policies - # (i.e. datasource policies) - # Note(thread-safety): blocking call - policy_name = db_policy_rules.policy_name(policy_name) - # call synchronizer to make sure policy is synchronized in memory - self.synchronizer.sync_one_policy(policy_name) - # Note(thread-safety): blocking call - policies = db_policy_rules.get_policies() - persisted_policies = set([p.name for p in policies]) - if policy_name not in persisted_policies: - if policy_name in self.theory: - LOG.debug( - "insert_persisted_rule error: rule not permitted for " - "policy %s", policy_name) - raise exception.PolicyRuntimeException( - name='rule_not_permitted') - - id_ = uuidutils.generate_uuid() try: - rule = self.parse(str_rule) - except exception.PolicyException as e: - # TODO(thinrichs): change compiler to provide these error_code - # names directly. - raise exception.PolicyException(str(e), name='rule_syntax') + if cfg.CONF.replicated_policy_engine: + # get session + db_session = db_api.get_locking_session() + db_session.begin(subtransactions=True) - if len(rule) == 1: - rule = rule[0] - else: - msg = ("Received multiple rules: " + - "; ".join(str(x) for x in rule)) - raise exception.PolicyRuntimeException(msg, name='multiple_rules') + # lock policy_rules table to prevent conflicting rules + # insertion (say causing unsupported recursion) + db_api.lock_tables(session=db_session, tables=['policy_rules']) - rule.set_id(id_) - rule.set_name(rule_name) - rule.set_comment(comment or "") - rule.set_original_str(str_rule) - changes = self._safe_process_policy_update( - rule, policy_name, persistent=True) - # save rule to database if change actually happened. - # Note: change produced may not be equivalent to original rule because - # of column-reference elimination. - if len(changes) > 0: - d = {'rule': rule.pretty_str(), - 'id': str(rule.id), - 'comment': rule.comment, - 'name': rule.name} + # synchronize policy rules to get latest state, locked state + # non-locking version because lock already acquired, + # avoid deadlock + self.synchronizer.synchronize_rules_nonlocking( + db_session=db_session) + else: + db_session = None + + # Reject rules inserted into non-persisted policies + # (i.e. datasource policies) + # Note(thread-safety): blocking call + policy_name = db_policy_rules.policy_name( + policy_name) + # call synchronizer to make sure policy is synchronized in memory + self.synchronizer.sync_one_policy(policy_name) + # Note(thread-safety): blocking call + policies = db_policy_rules.get_policies() + persisted_policies = set([p.name for p in policies]) + if policy_name not in persisted_policies: + if policy_name in self.theory: + LOG.debug( + "insert_persisted_rule error: rule not permitted for " + "policy %s", policy_name) + raise exception.PolicyRuntimeException( + name='rule_not_permitted') + + id_ = uuidutils.generate_uuid() try: - # Note(thread-safety): blocking call - db_policy_rules.add_policy_rule( - d['id'], policy_name, str_rule, d['comment'], - rule_name=d['name']) - return (d['id'], d) - except Exception as db_exception: - try: - self._safe_process_policy_update( - rule, policy_name, insert=False) - raise exception.PolicyRuntimeException( - "Error while writing to DB: %s." - % str(db_exception)) - except Exception as change_exception: - raise exception.PolicyRuntimeException( - "Error thrown during recovery from DB error. " - "Inconsistent state. DB error: %s. " - "New error: %s." % (str(db_exception), - str(change_exception))) + rule = self.parse(str_rule) + except exception.PolicyException as e: + # TODO(thinrichs): change compiler to provide these error_code + # names directly. + raise exception.PolicyException(str(e), name='rule_syntax') - # change not accepted means it was already there - raise exception.PolicyRuntimeException( - name='rule_already_exists') + if len(rule) == 1: + rule = rule[0] + else: + msg = ("Received multiple rules: " + + "; ".join(str(x) for x in rule)) + raise exception.PolicyRuntimeException( + msg, name='multiple_rules') + + rule.set_id(id_) + rule.set_name(rule_name) + rule.set_comment(comment or "") + rule.set_original_str(str_rule) + changes = self._safe_process_policy_update( + rule, policy_name, persistent=True) + # save rule to database if change actually happened. + # Note: change produced may not be equivalent to original rule + # because of column-reference elimination. + if len(changes) > 0: + d = {'rule': rule.pretty_str(), + 'id': str(rule.id), + 'comment': rule.comment, + 'name': rule.name} + try: + # Note(thread-safety): blocking call + db_policy_rules.add_policy_rule( + d['id'], policy_name, str_rule, d['comment'], + rule_name=d['name'], session=db_session) + # do not begin to avoid implicitly releasing table + # lock due to starting new transaction + return (d['id'], d) + except Exception as db_exception: + try: + self._safe_process_policy_update( + rule, policy_name, insert=False) + raise exception.PolicyRuntimeException( + "Error while writing to DB: %s." + % str(db_exception)) + except Exception as change_exception: + raise exception.PolicyRuntimeException( + "Error thrown during recovery from DB error. " + "Inconsistent state. DB error: %s. " + "New error: %s." % (str(db_exception), + str(change_exception))) + + # change not accepted means it was already there + raise exception.PolicyRuntimeException( + name='rule_already_exists') + finally: + # commit, unlock, and close db_session + if db_session: + db_api.commit_unlock_tables(session=db_session) + db_session.close() # Note(thread-safety): blocking function def persistent_delete_rule(self, id_, policy_name_or_id): diff --git a/congress/server/congress_server.py b/congress/server/congress_server.py index 1ea5bea9a..51353a25b 100644 --- a/congress/server/congress_server.py +++ b/congress/server/congress_server.py @@ -28,6 +28,7 @@ from oslo_log import log as logging from oslo_service import service from congress.common import config +from congress.db import api as db_api # FIXME It has to initialize distributed_architecture flag basing on the # config file before the python interpreter imports python file which has # if-statement for deepsix. Since the default value of the flag is False @@ -144,6 +145,12 @@ def main(): sys.exit("ERROR: Unable to find configuration file via default " "search paths ~/.congress/, ~/, /etc/congress/, /etc/) and " "the '--config-file' option!") + if cfg.CONF.replicated_policy_engine and not ( + db_api.is_mysql() or db_api.is_postgres()): + sys.exit("ERROR: replicated_policy_engine option can be used only with" + " MySQL or PostgreSQL database backends. Please set the " + "connection option in [database] section of congress.conf " + "to use a supported backend.") config.setup_logging() if not (cfg.CONF.api or cfg.CONF.policy_engine or cfg.CONF.datasources): diff --git a/congress/synchronizer/policy_rule_synchronizer.py b/congress/synchronizer/policy_rule_synchronizer.py index ac4ec3184..467685564 100644 --- a/congress/synchronizer/policy_rule_synchronizer.py +++ b/congress/synchronizer/policy_rule_synchronizer.py @@ -186,13 +186,16 @@ class PolicyRuleSynchronizer(object): @periodics.periodic(spacing=cfg.CONF.datasource_sync_period) @lockutils.synchronized('congress_synchronize_rules') - def synchronize_rules(self): + def synchronize_rules(self, db_session=None): + self.synchronize_rules_nonlocking(db_session=db_session) + + def synchronize_rules_nonlocking(self, db_session=None): LOG.debug("Synchronizing rules on node %s", self.node.node_id) try: # Read rules from DB. configured_rules = [] configured_facts = [] - for r in db_policy_rules.get_policy_rules(): + for r in db_policy_rules.get_policy_rules(session=db_session): if ':-' in r.rule: # if rule has body configured_rules.append({'rule': r.rule, 'id': r.id, diff --git a/devstack/plugin.sh b/devstack/plugin.sh index ae580a8d6..b4e405337 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -57,6 +57,7 @@ function configure_congress { iniset $CONGRESS_CONF oslo_policy policy_file $CONGRESS_POLICY_FILE iniset $CONGRESS_CONF DEFAULT auth_strategy $CONGRESS_AUTH_STRATEGY iniset $CONGRESS_CONF DEFAULT datasource_sync_period 30 + iniset $CONGRESS_CONF DEFAULT replicated_policy_engine "$CONGRESS_REPLICATED" # if [ "$CONGRESS_MULTIPROCESS_DEPLOYMENT" == "False" ]; then # iniset $CONGRESS_CONF DEFAULT transport_url $CONGRESS_TRANSPORT_URL diff --git a/doc/source/ha-deployment.rst b/doc/source/ha-deployment.rst index 18b01e2e4..21935b068 100644 --- a/doc/source/ha-deployment.rst +++ b/doc/source/ha-deployment.rst @@ -66,6 +66,13 @@ For example: [DEFAULT] transport_url = rabbit://:@:5672 +In addition, the ``replicated_policy_engine`` option should be set to ``True``. + +.. code-block:: text + + [DEFAULT] + replicated_policy_engine = True + All hosts should be configured with a database connection that points to the shared database deployed in step 1, not the local address shown in `separate install instructions`__.