From 05252da83572fb487054d03453da7494f3077f02 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Mon, 22 Jan 2018 15:50:09 +0700 Subject: [PATCH] Added session.flush() before update_on_match() * It turns out that update_on_match() from oslo_db breaks fields of a persistent object (attached to the session) in some cases. For example, when we use PostgreSQL as a DB it doesn't properly merge fields of the given specimen onto the fields of the object attached to the SQLAlchemy session. For some reason, it cleans up the history of ORM events registered before this operation so that if some update took place in this session and they are not flushed yet to DB they simply get lost. Hence adding flush() on the session right before this operation helps. * NOTE: this is rather a workaround than a fix of the root cause. Root cause still needs to be found. Change-Id: I546badd8b1c96b68567287fc9d38b07738272503 Closes-Bug: #1736821 --- mistral/cmd/launch.py | 1 + mistral/db/v2/sqlalchemy/api.py | 101 ++++++++++++++++---------------- mistral/workflow/data_flow.py | 4 +- 3 files changed, 53 insertions(+), 53 deletions(-) diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index d9954f407..88926a8dc 100644 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -129,6 +129,7 @@ MISTRAL_TITLE = """ || \\/ || \\\ || || || \\\ || || || || \\\ || || || /\\\ || || || || __// ||_// || \\\__// \\\_ || + Mistral Workflow Service, version %s """ % version.version_string() diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 3ee4ce818..a750fe36d 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -143,6 +143,51 @@ def _lock_entity(model, id): return _secure_query(model).with_for_update().filter(model.id == id).one() +@b.session_aware() +def update_on_match(id, specimen, values, session=None): + """Updates a model with the given values if it matches the given specimen. + + :param id: ID of a persistent model. + :param specimen: Specimen used to match the + :param values: Values to set to the model if fields of the object + match the specimen. + :param session: Session. + :return: Persistent object attached to the session. + """ + + assert id is not None + assert specimen is not None + + # We need to flush the session because when we do update_on_match() + # it doesn't always update the state of the persistent object properly + # when it merges a specimen state into it. Some fields get wiped out from + # the history of ORM events that must be flushed later. For example, it + # doesn't work well in case of Postgres. + # See https://bugs.launchpad.net/mistral/+bug/1736821 + session.flush() + + model = None + model_class = type(specimen) + + # Use WHERE clause to exclude possible conflicts if the state has + # already been changed. + try: + model = b.model_query(model_class).update_on_match( + specimen=specimen, + surrogate_key='id', + values=values + ) + except oslo_sqlalchemy.update_match.NoRowsMatched: + LOG.info( + "Can't change state of persistent object " + "because it has already been changed. [model_class=%, id=%s, " + "specimen=%s, values=%s]", + model_class, id, specimen, values + ) + + return model + + def _secure_query(model, *columns): query = b.model_query(model, columns) @@ -831,32 +876,10 @@ def delete_workflow_executions(session=None, **kwargs): return _delete_all(models.WorkflowExecution, **kwargs) -@b.session_aware() -def update_workflow_execution_state(id, cur_state, state, session=None): - wf_ex = None +def update_workflow_execution_state(id, cur_state, state): + specimen = models.WorkflowExecution(id=id, state=cur_state) - # Use WHERE clause to exclude possible conflicts if the state has - # already been changed. - try: - specimen = models.WorkflowExecution( - id=id, - state=cur_state - ) - - wf_ex = b.model_query( - models.WorkflowExecution).update_on_match( - specimen=specimen, - surrogate_key='id', - values={'state': state} - ) - except oslo_sqlalchemy.update_match.NoRowsMatched: - LOG.info( - "Can't change workflow execution state from %s to %s, " - "because it has already been changed. [execution_id=%s]", - cur_state, state, id - ) - - return wf_ex + return update_on_match(id, specimen, {'state': state}) # Tasks executions. @@ -987,32 +1010,10 @@ def delete_task_executions(session=None, **kwargs): return _delete_all(models.TaskExecution, **kwargs) -@b.session_aware() -def update_task_execution_state(id, cur_state, state, session=None): - wf_ex = None +def update_task_execution_state(id, cur_state, state): + specimen = models.TaskExecution(id=id, state=cur_state) - # Use WHERE clause to exclude possible conflicts if the state has - # already been changed. - try: - specimen = models.TaskExecution( - id=id, - state=cur_state - ) - - wf_ex = b.model_query( - models.TaskExecution).update_on_match( - specimen=specimen, - surrogate_key='id', - values={'state': state} - ) - except oslo_sqlalchemy.update_match.NoRowsMatched: - LOG.info( - "Can't change task execution state from %s to %s, " - "because it has already been changed. [execution_id=%s]", - cur_state, state, id - ) - - return wf_ex + return update_on_match(id, specimen, {'state': state}) # Delayed calls. diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 9d077bde5..0cfc24261 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -285,9 +285,7 @@ def add_openstack_data_to_context(wf_ex): def add_execution_to_context(wf_ex): wf_ex.context = wf_ex.context or {} - wf_ex.context['__execution'] = { - 'id': wf_ex.id - } + wf_ex.context['__execution'] = {'id': wf_ex.id} def add_environment_to_context(wf_ex):