Rework updating action executions heartbeats
New function update_action_execution_heartbeat was introduced in db api to optimize updating of heartbeats. Now to perform the update mistral downloads an action execution object, then modifies it, and finally uploads back to the db using ORM. This function removes initial object fetching and changes ORM update to sqlalchemy core. Change-Id: I4cb2d85c193531648540a37caca9ee868ede3a08
This commit is contained in:
parent
55c7990684
commit
e5031e26f1
|
@ -265,6 +265,10 @@ def create_or_update_action_execution(id, values):
|
|||
return IMPL.create_or_update_action_execution(id, values)
|
||||
|
||||
|
||||
def update_action_execution_heartbeat(id):
|
||||
return IMPL.update_action_execution_heartbeat(id)
|
||||
|
||||
|
||||
def delete_action_execution(id):
|
||||
return IMPL.delete_action_execution(id)
|
||||
|
||||
|
|
|
@ -784,6 +784,17 @@ def create_or_update_action_execution(id, values, session=None):
|
|||
return update_action_execution(id, values)
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def update_action_execution_heartbeat(id, session=None):
|
||||
if not id:
|
||||
raise exc.DBEntityNotFoundError
|
||||
|
||||
now = utils.utc_now_sec()
|
||||
session.query(models.ActionExecution).\
|
||||
filter(models.ActionExecution.id == id).\
|
||||
update({'last_heartbeat': now})
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def delete_action_execution(id, session=None):
|
||||
count = _secure_query(models.ActionExecution).filter(
|
||||
|
|
|
@ -225,15 +225,9 @@ class DefaultEngine(base.Engine):
|
|||
@post_tx_queue.run
|
||||
def report_running_actions(self, action_ex_ids):
|
||||
with db_api.transaction():
|
||||
now = u.utc_now_sec()
|
||||
|
||||
for exec_id in action_ex_ids:
|
||||
try:
|
||||
db_api.update_action_execution(
|
||||
exec_id,
|
||||
{"last_heartbeat": now},
|
||||
insecure=True
|
||||
)
|
||||
db_api.update_action_execution_heartbeat(exec_id)
|
||||
except exceptions.DBEntityNotFoundError:
|
||||
LOG.debug("Action execution heartbeat update failed. {}"
|
||||
.format(exec_id), exc_info=True)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
import copy
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
|
@ -1486,6 +1487,27 @@ class ActionExecutionTest(SQLAlchemyTest):
|
|||
|
||||
self.assertEqual(updated, fetched)
|
||||
|
||||
def test_update_action_execution_heartbeat(self):
|
||||
with db_api.transaction():
|
||||
created = db_api.create_action_execution(ACTION_EXECS[0])
|
||||
created_last_heartbeat = created.last_heartbeat
|
||||
|
||||
fetched = db_api.get_action_execution(created.id)
|
||||
fetched_last_heartbeat = fetched.last_heartbeat
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
self.assertEqual(created_last_heartbeat, fetched_last_heartbeat)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
db_api.update_action_execution_heartbeat(created.id)
|
||||
|
||||
fetched = db_api.get_action_execution(created.id)
|
||||
fetched_last_heartbeat = fetched.last_heartbeat
|
||||
|
||||
self.assertIsNot(created_last_heartbeat, fetched_last_heartbeat)
|
||||
|
||||
def test_get_action_executions(self):
|
||||
with db_api.transaction():
|
||||
created0 = db_api.create_action_execution(WF_EXECS[0])
|
||||
|
|
Loading…
Reference in New Issue