Merge "Rework updating action executions heartbeats"
This commit is contained in:
commit
ba29d58335
|
@ -265,6 +265,10 @@ def create_or_update_action_execution(id, values):
|
|||
return IMPL.create_or_update_action_execution(id, values)
|
||||
|
||||
|
||||
def update_action_execution_heartbeat(id):
|
||||
return IMPL.update_action_execution_heartbeat(id)
|
||||
|
||||
|
||||
def delete_action_execution(id):
|
||||
return IMPL.delete_action_execution(id)
|
||||
|
||||
|
|
|
@ -784,6 +784,17 @@ def create_or_update_action_execution(id, values, session=None):
|
|||
return update_action_execution(id, values)
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def update_action_execution_heartbeat(id, session=None):
|
||||
if not id:
|
||||
raise exc.DBEntityNotFoundError
|
||||
|
||||
now = utils.utc_now_sec()
|
||||
session.query(models.ActionExecution).\
|
||||
filter(models.ActionExecution.id == id).\
|
||||
update({'last_heartbeat': now})
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def delete_action_execution(id, session=None):
|
||||
count = _secure_query(models.ActionExecution).filter(
|
||||
|
|
|
@ -225,15 +225,9 @@ class DefaultEngine(base.Engine):
|
|||
@post_tx_queue.run
|
||||
def report_running_actions(self, action_ex_ids):
|
||||
with db_api.transaction():
|
||||
now = u.utc_now_sec()
|
||||
|
||||
for exec_id in action_ex_ids:
|
||||
try:
|
||||
db_api.update_action_execution(
|
||||
exec_id,
|
||||
{"last_heartbeat": now},
|
||||
insecure=True
|
||||
)
|
||||
db_api.update_action_execution_heartbeat(exec_id)
|
||||
except exceptions.DBEntityNotFoundError:
|
||||
LOG.debug("Action execution heartbeat update failed. {}"
|
||||
.format(exec_id), exc_info=True)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
import copy
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
|
@ -1486,6 +1487,27 @@ class ActionExecutionTest(SQLAlchemyTest):
|
|||
|
||||
self.assertEqual(updated, fetched)
|
||||
|
||||
def test_update_action_execution_heartbeat(self):
|
||||
with db_api.transaction():
|
||||
created = db_api.create_action_execution(ACTION_EXECS[0])
|
||||
created_last_heartbeat = created.last_heartbeat
|
||||
|
||||
fetched = db_api.get_action_execution(created.id)
|
||||
fetched_last_heartbeat = fetched.last_heartbeat
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
self.assertEqual(created_last_heartbeat, fetched_last_heartbeat)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
db_api.update_action_execution_heartbeat(created.id)
|
||||
|
||||
fetched = db_api.get_action_execution(created.id)
|
||||
fetched_last_heartbeat = fetched.last_heartbeat
|
||||
|
||||
self.assertIsNot(created_last_heartbeat, fetched_last_heartbeat)
|
||||
|
||||
def test_get_action_executions(self):
|
||||
with db_api.transaction():
|
||||
created0 = db_api.create_action_execution(WF_EXECS[0])
|
||||
|
|
Loading…
Reference in New Issue