Implement timelimit mechanism based on tracking of tasks

timelimit should protect from hanged tasks or from tasks

Change-Id: I0a9263aac481b28e3cef47c0ff83ae415bdcb22b
This commit is contained in:
Dmitry Shulyak 2016-01-21 14:25:12 +02:00
parent 4541230959
commit f98c56cf15
14 changed files with 198 additions and 29 deletions

View File

@ -33,6 +33,7 @@ C.log_file = 'solar.log'
C.system_log_address = 'ipc:///tmp/solar_system_log'
C.tasks_address = 'ipc:///tmp/solar_tasks'
C.scheduler_address = 'ipc:///tmp/solar_scheduler'
C.timewatcher_address = 'ipc:///tmp/solar_timewatcher'
def _lookup_vals(setter, config, prefix=None):

View File

@ -1041,6 +1041,7 @@ class Task(Model):
task_type = Field(basestring)
args = Field(list)
errmsg = Field(basestring, default=str)
timelimit = Field(int, default=int)
execution = IndexedField(basestring)
parents = ParentField(default=list)

View File

@ -27,6 +27,7 @@ SCHEDULER_CLIENT = Client(C.scheduler_address)
def construct_scheduler(tasks_address, scheduler_address):
scheduler = wscheduler.Scheduler(Client(tasks_address))
scheduler_executor = Executor(scheduler, scheduler_address)
scheduler.for_all.before(lambda ctxt: ModelMeta.session_start())
scheduler.for_all.after(lambda ctxt: ModelMeta.session_end())
Executor(scheduler, scheduler_address).run()
@ -44,6 +45,8 @@ def construct_tasks(system_log_address, tasks_address, scheduler_address):
scheduler = wscheduler.SchedulerCallbackClient(
Client(scheduler_address))
tasks = Tasks()
tasks_executor = Executor(tasks, tasks_address)
tasks.for_all.before(tasks_executor.register)
tasks.for_all.on_success(syslog.commit)
tasks.for_all.on_error(syslog.error)
tasks.for_all.on_success(scheduler.update)

View File

@ -20,19 +20,22 @@ import zerorpc
from solar.core.log import log
class ImprovedPuller(zerorpc.Puller):
class PoolBasedPuller(zerorpc.Puller):
"""ImprovedPuller allows to control pool of gevent threads and
track assignments of gevent threads
"""
def __init__(self, pool_size=100, *args, **kwargs):
# TODO put pool_size into config for each worker
self._tasks_pool = gevent.pool.Pool(pool_size)
super(ImprovedPuller, self).__init__(*args, **kwargs)
super(PoolBasedPuller, self).__init__(*args, **kwargs)
def _receiver(self):
while True:
event = self._events.recv()
self._tasks_pool.spawn(self._async_event, event)
self._handle_event(event)
def _handle_event(self, event):
self._tasks_pool.spawn(self._async_event, event)
def _async_event(self, event):
try:
@ -55,11 +58,26 @@ class ImprovedPuller(zerorpc.Puller):
def run(self):
try:
super(ImprovedPuller, self).run()
super(PoolBasedPuller, self).run()
finally:
self._tasks_pool.join(raise_error=True)
class LimitedExecutionPuller(PoolBasedPuller):
def _handle_event(self, event):
ctxt = event.args[0]
timelimit = ctxt.get('timelimit', 0)
if timelimit and 'kill' in self._methods:
# greenlet for interupting pool-based greenlets shouldn't
# share a pool with them, or it is highly possible that
# it wont be ever executed with low number of greenlets in
# a pool
gevent.spawn_later(
timelimit, self._methods['kill'], ctxt, ctxt['task_id'])
self._tasks_pool.spawn(self._async_event, event)
class Executor(object):
def __init__(self, worker, bind_to):
@ -79,7 +97,7 @@ class Executor(object):
self._tasks_register.pop(task_id)
def run(self):
server = ImprovedPuller(methods=self.worker)
server = LimitedExecutionPuller(methods=self.worker)
server.bind(self.bind_to)
server.run()

View File

@ -39,7 +39,8 @@ def save_graph(graph):
'target': graph.node[n].get('target', '') or '',
'task_type': graph.node[n].get('type', ''),
'args': graph.node[n].get('args', []),
'errmsg': graph.node[n].get('errmsg', '') or ''})
'errmsg': graph.node[n].get('errmsg', '') or '',
'timelimit': graph.node[n].get('timelimit', 0)})
graph.node[n]['task'] = t
for pred in graph.predecessors(n):
pred_task = graph.node[pred]['task']
@ -77,7 +78,8 @@ def get_graph(uid):
type=t.task_type, args=t.args,
target=t.target or None,
errmsg=t.errmsg or None,
task=t)
task=t,
timelimit=t.timelimit or 0)
for u in t.parents.all_names():
dg.add_edge(u, t.name)
return dg

View File

@ -1 +1,20 @@
#
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from solar.orchestration.workers.scheduler import Scheduler
from solar.orchestration.workers.scheduler import SchedulerCallbackClient
from solar.orchestration.workers.system_log import SystemLog
from solar.orchestration.workers.tasks import Tasks

View File

@ -46,10 +46,19 @@ class Scheduler(base.Worker):
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task_type = dg.node[task_name]['type']
dg.node[task_name]['status'] = 'INPROGRESS'
ctxt = {'task_id': task_id, 'task_name': task_name}
timelimit = dg.node[task_name].get('timelimit', 0)
ctxt = {
'task_id': task_id,
'task_name': task_name,
'plan_uid': plan_uid,
'timelimit': timelimit}
self._tasks(
task_type, ctxt,
*dg.node[task_name]['args'])
if timelimit:
log.debug(
'Timelimit for task %s will be %s',
task_id, timelimit)
graph.update_graph(dg)
log.debug('Scheduled tasks %r', rst)
# process tasks with tasks client
@ -64,6 +73,9 @@ class Scheduler(base.Worker):
graph.update_graph(dg)
def update_next(self, ctxt, status, errmsg):
log.debug(
'Received update for TASK %s - %s %s',
ctxt['task_id'], status, errmsg)
plan_uid, task_name = ctxt['task_id'].rsplit(':', 1)
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid)
@ -75,10 +87,19 @@ class Scheduler(base.Worker):
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task_type = dg.node[task_name]['type']
dg.node[task_name]['status'] = 'INPROGRESS'
ctxt = {'task_id': task_id, 'task_name': task_name}
timelimit = dg.node[task_name].get('timelimit', 0)
ctxt = {
'task_id': task_id,
'task_name': task_name,
'plan_uid': plan_uid,
'timelimit': timelimit}
self._tasks(
task_type, ctxt,
*dg.node[task_name]['args'])
if timelimit:
log.debug(
'Timelimit for task %s will be %s',
task_id, timelimit)
graph.update_graph(dg)
log.debug('Scheduled tasks %r', rst)
return rst

View File

@ -17,13 +17,17 @@ import time
from solar.core import actions
from solar.core.log import log
from solar.core import resource
from solar.errors import ExecutionTimeout
from solar.orchestration.workers import base
class Tasks(base.Worker):
def sleep(self, ctxt, seconds):
return time.sleep(seconds)
log.debug('Received sleep for %s', seconds)
time.sleep(seconds)
log.debug('Finished sleep %s', seconds)
return None
def error(self, ctxt, message):
raise Exception(message)
@ -36,3 +40,10 @@ class Tasks(base.Worker):
resource_name, action)
res = resource.load(resource_name)
return actions.resource_action(res, action)
def kill(self, ctxt, task_id):
log.debug('Received kill request for task_id %s', task_id)
if not hasattr(self._executor, 'kill'):
raise NotImplemented(
'Current executor doesnt support interruping tasks')
self._executor.kill(task_id, ExecutionTimeout)

View File

@ -109,6 +109,11 @@ def two_path_plan():
return plan_from_fixture('two_path')
@pytest.fixture
def timelimit_plan():
return plan_from_fixture('timelimit')
@pytest.fixture
def sequence_vr(tmpdir):
base_path = os.path.join(

View File

@ -22,3 +22,18 @@ import pytest
def address():
return 'ipc:///tmp/solar_test_' + ''.join(
(random.choice(string.ascii_lowercase) for x in xrange(4)))
@pytest.fixture
def tasks_address(address):
return address + 'tasks'
@pytest.fixture
def system_log_address(address):
return address + 'system_log'
@pytest.fixture
def scheduler_address(address):
return address + 'scheduler'

View File

@ -19,6 +19,7 @@ import pytest
from solar.core.resource import composer
from solar.dblayer.model import clear_cache
from solar.errors import ExecutionTimeout
from solar import orchestration
from solar.orchestration.graph import wait_finish
from solar.orchestration.traversal import states
@ -26,21 +27,6 @@ from solar.system_log import change
from solar.system_log import data
@pytest.fixture
def tasks_address(address):
return address + 'tasks'
@pytest.fixture
def system_log_address(address):
return address + 'system_log'
@pytest.fixture
def scheduler_address(address):
return address + 'scheduler'
@pytest.fixture
def scheduler_client(scheduler_address):
return orchestration.Client(scheduler_address)
@ -78,16 +64,19 @@ def resources(request, sequence_vr):
@pytest.mark.parametrize('scale', [10])
def test_concurrent_sequences_with_no_handler(scale, scheduler_client):
total_resources = scale * 3
timeout = scale
timeout = scale * 2
assert len(change.stage_changes()) == total_resources
plan = change.send_to_orchestration()
scheduler_client.next({}, plan.graph['uid'])
def wait_function(timeout):
for summary in wait_finish(plan.graph['uid'], timeout):
assert summary[states.ERROR.name] == 0
time.sleep(0.5)
try:
for summary in wait_finish(plan.graph['uid'], timeout):
assert summary[states.ERROR.name] == 0
time.sleep(0.5)
except ExecutionTimeout:
pass
return summary
waiter = gevent.spawn(wait_function, timeout)
waiter.join(timeout=timeout)

View File

@ -52,6 +52,7 @@ def scheduler(tasks_for_scheduler, address):
worker.for_all.before(session_start)
worker.for_all.after(session_end)
executor = zerorpc_executor.Executor(worker, address)
gevent.spawn(executor.run)
return worker, zerorpc_executor.Client(address)

View File

@ -0,0 +1,71 @@
#
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import time
import gevent
import pytest
from solar.dblayer import ModelMeta
from solar.errors import ExecutionTimeout
from solar.orchestration import Client
from solar.orchestration import Executor
from solar.orchestration import graph
from solar.orchestration.traversal import states
from solar.orchestration import workers
@pytest.fixture(autouse=True)
def scheduler(scheduler_address, tasks_address):
scheduler = workers.Scheduler(Client(tasks_address))
scheduler_executor = Executor(scheduler, scheduler_address)
scheduler.for_all.before(lambda ctxt: ModelMeta.session_start())
scheduler.for_all.after(lambda ctxt: ModelMeta.session_end())
gevent.spawn(scheduler_executor.run)
@pytest.fixture(autouse=True)
def tasks(tasks_address, scheduler_address):
scheduler = workers.SchedulerCallbackClient(
Client(scheduler_address))
tasks = workers.Tasks()
tasks_executor = Executor(tasks, tasks_address)
tasks.for_all.before(tasks_executor.register)
tasks.for_all.on_success(scheduler.update)
tasks.for_all.on_error(scheduler.error)
gevent.spawn(tasks_executor.run)
@pytest.fixture
def scheduler_client(scheduler_address):
return Client(scheduler_address)
def test_timelimit_plan(timelimit_plan, scheduler_client):
scheduler_client.next({}, timelimit_plan.graph['uid'])
def wait_function(timeout):
try:
for summary in graph.wait_finish(
timelimit_plan.graph['uid'], timeout):
time.sleep(0.5)
except ExecutionTimeout:
return summary
waiter = gevent.spawn(wait_function, 3)
waiter.join(timeout=3)
finished_plan = graph.get_graph(timelimit_plan.graph['uid'])
assert 'ExecutionTimeout' in finished_plan.node['t1']['errmsg']
assert finished_plan.node['t2']['status'] == states.PENDING.name

View File

@ -0,0 +1,12 @@
name: timelimit
tasks:
- uid: t1
parameters:
type: sleep
args: [10]
timelimit: 1
- uid: t2
parameters:
type: echo
args: ['message']
after: [t1]