diff --git a/senlin/db/api.py b/senlin/db/api.py index a6a08477f..88d52ae22 100644 --- a/senlin/db/api.py +++ b/senlin/db/api.py @@ -384,6 +384,10 @@ def action_acquire_random_ready(context, owner, timestamp): return IMPL.action_acquire_random_ready(context, owner, timestamp) +def action_acquire_first_ready(context, owner, timestamp): + return IMPL.action_acquire_first_ready(context, owner, timestamp) + + def action_abandon(context, action_id, values=None): return IMPL.action_abandon(context, action_id, values) diff --git a/senlin/db/sqlalchemy/api.py b/senlin/db/sqlalchemy/api.py index b2b8b3d4e..1dcf12309 100644 --- a/senlin/db/sqlalchemy/api.py +++ b/senlin/db/sqlalchemy/api.py @@ -1217,6 +1217,31 @@ def action_acquire_random_ready(context, owner, timestamp): return action +def _action_acquire_ready(session, owner, timestamp, order=None): + action = session.query(models.Action).\ + filter_by(status=consts.ACTION_READY).\ + filter_by(owner=None).\ + order_by(order or func.random()).\ + with_for_update().first() + + if action: + action.owner = owner + action.start_time = timestamp + action.status = consts.ACTION_RUNNING + action.status_reason = _('The action is being processed.') + action.save(session) + + return action + + +@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True, + retry_interval=0.5, inc_retry_interval=True) +def action_acquire_first_ready(context, owner, timestamp): + with session_for_write() as session: + return _action_acquire_ready(session, owner, timestamp, + consts.ACTION_CREATED_AT) + + def action_abandon(context, action_id, values=None): '''Abandon an action for other workers to execute again. diff --git a/senlin/objects/action.py b/senlin/objects/action.py index a258885fe..e6fcc14ea 100644 --- a/senlin/objects/action.py +++ b/senlin/objects/action.py @@ -129,6 +129,10 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat): def acquire_random_ready(cls, context, owner, timestamp): return db_api.action_acquire_random_ready(context, owner, timestamp) + @classmethod + def acquire_first_ready(cls, context, owner, timestamp): + return db_api.action_acquire_first_ready(context, owner, timestamp) + @classmethod def abandon(cls, context, action_id, values=None): return db_api.action_abandon(context, action_id, values) diff --git a/senlin/tests/unit/db/test_action_api.py b/senlin/tests/unit/db/test_action_api.py index dee53774a..e75e18621 100644 --- a/senlin/tests/unit/db/test_action_api.py +++ b/senlin/tests/unit/db/test_action_api.py @@ -10,10 +10,10 @@ # License for the specific language governing permissions and limitations # under the License. +import six import time -import six - +from oslo_utils import timeutils as tu from senlin.common import consts from senlin.common import exception from senlin.db.sqlalchemy import api as db_api @@ -156,6 +156,37 @@ class DBAPIActionTest(base.SenlinTestCase): retobj = db_api.action_get(new_ctx, action.id, project_safe=False) self.assertIsNotNone(retobj) + def test_acquire_first_ready_none(self): + data = {'created_at': tu.utcnow(True)} + + _create_action(self.ctx, **data) + result = db_api.action_acquire_first_ready(self.ctx, 'fake_o', + tu.utcnow(True)) + self.assertIsNone(result) + + def test_acquire_first_ready_one(self): + data = {'created_at': tu.utcnow(True)} + _create_action(self.ctx, **data) + + result = db_api.action_acquire_first_ready(self.ctx, 'fake_o', + tu.utcnow(True)) + self.assertIsNone(result) + + def test_acquire_first_ready_mult(self): + data = { + 'created_at': tu.utcnow(True), + 'status': 'READY', + } + action1 = _create_action(self.ctx, **data) + time.sleep(1) + + data['created_at'] = tu.utcnow(True) + _create_action(self.ctx, **data) + + result = db_api.action_acquire_first_ready(self.ctx, 'fake_o', + time.time()) + self.assertEqual(action1.id, result.id) + def test_action_acquire_random_ready(self): specs = [ {'name': 'A01', 'status': 'INIT'}, @@ -171,6 +202,7 @@ class DBAPIActionTest(base.SenlinTestCase): timestamp = time.time() action = db_api.action_acquire_random_ready(self.ctx, worker, timestamp) + self.assertIn(action.name, ('A02', 'A04')) self.assertEqual('worker2', action.owner) self.assertEqual(consts.ACTION_RUNNING, action.status)