Merge "Lookup a random action to execute"
This commit is contained in:
commit
9451d3a577
@ -368,8 +368,8 @@ def action_acquire(context, action_id, owner, timestamp):
|
||||
return IMPL.action_acquire(context, action_id, owner, timestamp)
|
||||
|
||||
|
||||
def action_acquire_1st_ready(context, owner, timestamp):
|
||||
return IMPL.action_acquire_1st_ready(context, owner, timestamp)
|
||||
def action_acquire_random_ready(context, owner, timestamp):
|
||||
return IMPL.action_acquire_random_ready(context, owner, timestamp)
|
||||
|
||||
|
||||
def action_abandon(context, action_id):
|
||||
|
@ -28,6 +28,7 @@ from oslo_utils import timeutils
|
||||
import osprofiler.sqlalchemy
|
||||
import sqlalchemy
|
||||
from sqlalchemy.orm import joinedload_all
|
||||
from sqlalchemy.sql.expression import func
|
||||
|
||||
from senlin.common import consts
|
||||
from senlin.common import exception
|
||||
@ -1136,11 +1137,12 @@ def action_acquire(context, action_id, owner, timestamp):
|
||||
|
||||
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
|
||||
retry_interval=0.5, inc_retry_interval=True)
|
||||
def action_acquire_1st_ready(context, owner, timestamp):
|
||||
def action_acquire_random_ready(context, owner, timestamp):
|
||||
with session_for_write() as session:
|
||||
action = session.query(models.Action).\
|
||||
filter_by(status=consts.ACTION_READY).\
|
||||
filter_by(owner=None).\
|
||||
order_by(func.random()).\
|
||||
with_for_update().first()
|
||||
|
||||
if action:
|
||||
|
@ -117,8 +117,8 @@ class ThreadGroupManager(object):
|
||||
batch_interval = cfg.CONF.batch_interval
|
||||
while True:
|
||||
timestamp = wallclock()
|
||||
action = ao.Action.acquire_1st_ready(self.db_session, worker_id,
|
||||
timestamp)
|
||||
action = ao.Action.acquire_random_ready(self.db_session, worker_id,
|
||||
timestamp)
|
||||
if action:
|
||||
if batch_size > 0 and 'NODE' in action.action:
|
||||
if actions_launched < batch_size:
|
||||
|
@ -97,8 +97,8 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
|
||||
return db_api.action_acquire(context, action_id, owner, timestamp)
|
||||
|
||||
@classmethod
|
||||
def acquire_1st_ready(cls, context, owner, timestamp):
|
||||
return db_api.action_acquire_1st_ready(context, owner, timestamp)
|
||||
def acquire_random_ready(cls, context, owner, timestamp):
|
||||
return db_api.action_acquire_random_ready(context, owner, timestamp)
|
||||
|
||||
@classmethod
|
||||
def abandon(cls, context, action_id):
|
||||
|
@ -103,7 +103,7 @@ class DBAPIActionTest(base.SenlinTestCase):
|
||||
retobj = db_api.action_get(new_ctx, action.id, project_safe=True)
|
||||
self.assertIsNotNone(retobj)
|
||||
|
||||
def test_action_acquire_1st_ready(self):
|
||||
def test_action_acquire_random_ready(self):
|
||||
specs = [
|
||||
{'name': 'A01', 'status': 'INIT'},
|
||||
{'name': 'A02', 'status': 'READY', 'owner': 'worker1'},
|
||||
@ -116,8 +116,9 @@ class DBAPIActionTest(base.SenlinTestCase):
|
||||
|
||||
worker = 'worker2'
|
||||
timestamp = time.time()
|
||||
action = db_api.action_acquire_1st_ready(self.ctx, worker, timestamp)
|
||||
self.assertEqual('A04', action.name)
|
||||
action = db_api.action_acquire_random_ready(self.ctx, worker,
|
||||
timestamp)
|
||||
self.assertIn(action.name, ('A02', 'A04'))
|
||||
self.assertEqual('worker2', action.owner)
|
||||
self.assertEqual(consts.ACTION_RUNNING, action.status)
|
||||
self.assertEqual(timestamp, action.start_time)
|
||||
|
@ -90,7 +90,7 @@ class SchedulerTest(base.SenlinTestCase):
|
||||
oslo_context.get_current(),
|
||||
None, f)
|
||||
|
||||
@mock.patch.object(db_api, 'action_acquire_1st_ready')
|
||||
@mock.patch.object(db_api, 'action_acquire_random_ready')
|
||||
@mock.patch.object(db_api, 'action_acquire')
|
||||
def test_start_action(self, mock_action_acquire,
|
||||
mock_action_acquire_1st):
|
||||
@ -113,7 +113,7 @@ class SchedulerTest(base.SenlinTestCase):
|
||||
self.assertEqual(mock_thread, tgm.workers['0123'])
|
||||
mock_thread.link.assert_called_once_with(mock.ANY, '0123')
|
||||
|
||||
@mock.patch.object(db_api, 'action_acquire_1st_ready')
|
||||
@mock.patch.object(db_api, 'action_acquire_random_ready')
|
||||
def test_start_action_no_action_id(self, mock_acquire_action):
|
||||
mock_action = mock.Mock()
|
||||
mock_action.id = '0123'
|
||||
@ -135,7 +135,7 @@ class SchedulerTest(base.SenlinTestCase):
|
||||
mock_thread.link.assert_called_once_with(mock.ANY, '0123')
|
||||
|
||||
@mock.patch.object(scheduler, 'sleep')
|
||||
@mock.patch.object(db_api, 'action_acquire_1st_ready')
|
||||
@mock.patch.object(db_api, 'action_acquire_random_ready')
|
||||
def test_start_action_batch_control(self, mock_acquire_action, mock_sleep):
|
||||
mock_action1 = mock.Mock()
|
||||
mock_action1.id = 'ID1'
|
||||
@ -158,7 +158,7 @@ class SchedulerTest(base.SenlinTestCase):
|
||||
|
||||
mock_sleep.assert_called_once_with(3)
|
||||
|
||||
@mock.patch.object(db_api, 'action_acquire_1st_ready')
|
||||
@mock.patch.object(db_api, 'action_acquire_random_ready')
|
||||
@mock.patch.object(db_api, 'action_acquire')
|
||||
def test_start_action_failed_locking_action(self, mock_acquire_action,
|
||||
mock_acquire_action_1st):
|
||||
@ -171,7 +171,7 @@ class SchedulerTest(base.SenlinTestCase):
|
||||
res = tgm.start_action('4567', '0123')
|
||||
self.assertIsNone(res)
|
||||
|
||||
@mock.patch.object(db_api, 'action_acquire_1st_ready')
|
||||
@mock.patch.object(db_api, 'action_acquire_random_ready')
|
||||
def test_start_action_no_action_ready(self, mock_acquire_action):
|
||||
mock_acquire_action.return_value = None
|
||||
mock_group = mock.Mock()
|
||||
|
Loading…
Reference in New Issue
Block a user