DB support for 'action_acquire_first_ready'

will try to get the actions which status are 'READY' ordered by
'created_at' field.

Change-Id: I2bce0dbb6c58ef90b475a5b633e7e2843608508f
This commit is contained in:
ruijie 2017-08-16 03:44:08 -07:00
parent b2af6ca257
commit 21b958a6ca
4 changed files with 67 additions and 2 deletions

View File

@ -384,6 +384,10 @@ def action_acquire_random_ready(context, owner, timestamp):
return IMPL.action_acquire_random_ready(context, owner, timestamp)
def action_acquire_first_ready(context, owner, timestamp):
return IMPL.action_acquire_first_ready(context, owner, timestamp)
def action_abandon(context, action_id, values=None):
return IMPL.action_abandon(context, action_id, values)

View File

@ -1217,6 +1217,31 @@ def action_acquire_random_ready(context, owner, timestamp):
return action
def _action_acquire_ready(session, owner, timestamp, order=None):
action = session.query(models.Action).\
filter_by(status=consts.ACTION_READY).\
filter_by(owner=None).\
order_by(order or func.random()).\
with_for_update().first()
if action:
action.owner = owner
action.start_time = timestamp
action.status = consts.ACTION_RUNNING
action.status_reason = _('The action is being processed.')
action.save(session)
return action
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
retry_interval=0.5, inc_retry_interval=True)
def action_acquire_first_ready(context, owner, timestamp):
with session_for_write() as session:
return _action_acquire_ready(session, owner, timestamp,
consts.ACTION_CREATED_AT)
def action_abandon(context, action_id, values=None):
'''Abandon an action for other workers to execute again.

View File

@ -129,6 +129,10 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
def acquire_random_ready(cls, context, owner, timestamp):
return db_api.action_acquire_random_ready(context, owner, timestamp)
@classmethod
def acquire_first_ready(cls, context, owner, timestamp):
return db_api.action_acquire_first_ready(context, owner, timestamp)
@classmethod
def abandon(cls, context, action_id, values=None):
return db_api.action_abandon(context, action_id, values)

View File

@ -10,10 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
import time
import six
from oslo_utils import timeutils as tu
from senlin.common import consts
from senlin.common import exception
from senlin.db.sqlalchemy import api as db_api
@ -156,6 +156,37 @@ class DBAPIActionTest(base.SenlinTestCase):
retobj = db_api.action_get(new_ctx, action.id, project_safe=False)
self.assertIsNotNone(retobj)
def test_acquire_first_ready_none(self):
data = {'created_at': tu.utcnow(True)}
_create_action(self.ctx, **data)
result = db_api.action_acquire_first_ready(self.ctx, 'fake_o',
tu.utcnow(True))
self.assertIsNone(result)
def test_acquire_first_ready_one(self):
data = {'created_at': tu.utcnow(True)}
_create_action(self.ctx, **data)
result = db_api.action_acquire_first_ready(self.ctx, 'fake_o',
tu.utcnow(True))
self.assertIsNone(result)
def test_acquire_first_ready_mult(self):
data = {
'created_at': tu.utcnow(True),
'status': 'READY',
}
action1 = _create_action(self.ctx, **data)
time.sleep(1)
data['created_at'] = tu.utcnow(True)
_create_action(self.ctx, **data)
result = db_api.action_acquire_first_ready(self.ctx, 'fake_o',
time.time())
self.assertEqual(action1.id, result.id)
def test_action_acquire_random_ready(self):
specs = [
{'name': 'A01', 'status': 'INIT'},
@ -171,6 +202,7 @@ class DBAPIActionTest(base.SenlinTestCase):
timestamp = time.time()
action = db_api.action_acquire_random_ready(self.ctx, worker,
timestamp)
self.assertIn(action.name, ('A02', 'A04'))
self.assertEqual('worker2', action.owner)
self.assertEqual(consts.ACTION_RUNNING, action.status)