From 62862ca0b72ca0b91a2b6e598ea55fec084f57d8 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Wed, 21 Dec 2016 12:30:34 +0700 Subject: [PATCH] Refresh object state after lock acquisition in WithItemsTask * We need to refresh task state once we acquired the 'with-items' lock to prevent modifying stale values cached by SQLAlchemy session. For this purpose, new DB API method refresh() was added. Without this change 'with-items' task doesn't work properly in case of using multiple engines. Change-Id: Id5a1d2cf03090279e6df3805562e00d300b89953 Closes-Bug: #1640378 Partial-Bug: #1640379 --- mistral/db/v2/api.py | 4 ++++ mistral/db/v2/sqlalchemy/api.py | 5 +++++ mistral/engine/tasks.py | 8 ++++++++ 3 files changed, 17 insertions(+) diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index f0ce2d43..8c4170de 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -58,6 +58,10 @@ def transaction(): yield +def refresh(model): + IMPL.refresh(model) + + # Locking. diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index f2fcd61b..e8bb9f72 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -115,6 +115,11 @@ def transaction(): end_tx() +@b.session_aware() +def refresh(model, session=None): + session.refresh(model) + + @b.session_aware() def acquire_lock(model, id, session=None): # Expire all so all objects queried after lock is acquired diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index c300772e..52a8a861 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -438,6 +438,14 @@ class WithItemsTask(RegularTask): # from modifying runtime context simultaneously by multiple # transactions. with db_api.named_lock('with-items-%s' % self.task_ex.id): + # NOTE: We need to refresh task execution object right + # after the lock is acquired to make sure that we're + # working with a fresh state of its runtime context. + # Otherwise, SQLAlchemy session can contain a stale + # cached version of it so that we don't modify actual + # values (i.e. capacity). + db_api.refresh(self.task_ex) + self._on_action_complete(action_ex) def _on_action_complete(self, action_ex):