Shutdown server gracefully

Change-Id: I0514d4a63d51f11d978261bd06e9fbe16a2b489a
This commit is contained in:
Ilya Shakhat
2015-05-07 18:17:08 +03:00
parent 889ece32d0
commit a8b4c7b8c2
3 changed files with 70 additions and 34 deletions

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import os
import shlex
import tempfile
@@ -94,15 +95,20 @@ def work(agent_id, endpoint, polling_interval):
while True:
task = poll_task(socket, agent_id)
start_at = task.get('start_at')
if start_at:
now = time.time()
start_at_str = datetime.datetime.fromtimestamp(
start_at).isoformat()
if start_at > now:
LOG.debug('Scheduling task at %s', start_at_str)
time.sleep(start_at - now)
else:
LOG.warning('Scheduling in the past: %s', start_at_str)
if task['operation'] == 'execute':
now = int(time.time())
start_at = task.get('start_at') or now
command = task.get('command')
LOG.debug('Scheduling command %s at %s', command, start_at)
time.sleep(start_at - now)
result = run_command(command)
result = run_command(task.get('command'))
send_reply(socket, agent_id, result)
elif task['operation'] == 'configure':

View File

@@ -24,6 +24,9 @@ from shaker.engine import messaging
LOG = logging.getLogger(__name__)
HEARTBEAT_AGENT = '__heartbeat'
CLEANER_AGENT = '__cleaner'
class BaseOperation(object):
def get_agent_join_timeout(self):
@@ -32,6 +35,9 @@ class BaseOperation(object):
def get_active_agent_ids(self):
pass
def get_default_reply(self, agent_id):
return {'operation': 'none'}
def get_reply(self, agent_id, start_at):
return {}
@@ -96,6 +102,21 @@ class ExecuteOperation(BaseOperation):
return r
class CleanOperation(BaseOperation):
def __init__(self, polling_interval):
self.polling_interval = polling_interval
def get_active_agent_ids(self):
return {CLEANER_AGENT}
def get_default_reply(self, agent_id):
reply = super(CleanOperation, self).get_default_reply(agent_id)
if agent_id != HEARTBEAT_AGENT:
# send all agents sleep
reply['start_at'] = time.time() + self.polling_interval * 4
return reply
class Quorum(object):
def __init__(self, message_queue, polling_interval, agent_loss_timeout,
agent_join_timeout):
@@ -104,6 +125,10 @@ class Quorum(object):
self.agent_loss_timeout = agent_loss_timeout
self.agent_join_timeout = agent_join_timeout
def __del__(self):
LOG.info('Cleaning the quorum')
self._run(CleanOperation(self.polling_interval))
def _run(self, operation):
current = operation.get_active_agent_ids()
LOG.debug('Executing operation %s on agents: %s', operation, current)
@@ -118,14 +143,14 @@ class Quorum(object):
for message, reply_handler in self.message_queue:
agent_id = message.get('agent_id')
op = message.get('operation')
reply = {'operation': 'none'}
reply = operation.get_default_reply(agent_id)
now = time.time()
if agent_id in (current - replied):
# message from a known not yet worked agent
lives[agent_id] = (now + self.polling_interval * 2 +
self.agent_loss_timeout)
op = message.get('operation')
if op == 'poll':
reply = operation.get_reply(agent_id, start_at)
@@ -147,7 +172,9 @@ class Quorum(object):
if replied | lost >= current:
if lost:
LOG.warning('Lost agents: %s', lost)
filtered = set(a for a in lost if a[0] != '_')
if filtered: # do not warn about private agents
LOG.warning('Lost agents: %s', filtered)
# update result with info about lost agents
for agent_id in lost:
result[agent_id] = operation.process_failure(agent_id)
@@ -169,7 +196,7 @@ class Quorum(object):
return result
def join(self, agent_ids):
LOG.debug('Waiting for quorum of agents: %s', agent_ids)
LOG.info('Waiting for quorum of agents: %s', agent_ids)
return self._run(JoinOperation(agent_ids, self.polling_interval,
self.agent_join_timeout))
@@ -183,7 +210,7 @@ def make_quorum(agent_ids, server_endpoint, polling_interval,
heartbeat = multiprocessing.Process(
target=agent_process.work,
kwargs=dict(agent_id='heartbeat', endpoint=server_endpoint,
kwargs=dict(agent_id=HEARTBEAT_AGENT, endpoint=server_endpoint,
polling_interval=polling_interval))
heartbeat.daemon = True
heartbeat.start()

View File

@@ -19,14 +19,14 @@ import mock
import testtools
from shaker.engine.executors import base as base_executor
from shaker.engine import quorum
from shaker.engine import quorum as quorum_pkg
STEP = 10 # polling interval
LOSS_TIMEOUT = 60
JOIN_TIMEOUT = 600
make_quorum = functools.partial(quorum.Quorum, polling_interval=STEP,
make_quorum = functools.partial(quorum_pkg.Quorum, polling_interval=STEP,
agent_loss_timeout=LOSS_TIMEOUT,
agent_join_timeout=JOIN_TIMEOUT)
@@ -69,9 +69,6 @@ class TestQuorum(testtools.TestCase):
self.mock_time.return_value = event['time']
yield (event['msg'], self._reply(event['reply']))
if fail_at_end:
self.fail('Unreachable state signalling that event loop hangs')
def test_poll_reply(self):
self.mock_time.return_value = 0
event_stream = [
@@ -120,7 +117,8 @@ class TestQuorum(testtools.TestCase):
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2, expected_duration=STEP),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
dict(msg=dict(operation='reply',
agent_id=quorum_pkg.HEARTBEAT_AGENT),
reply=dict(operation='none'),
time=STEP * 10),
]
@@ -137,30 +135,31 @@ class TestQuorum(testtools.TestCase):
"""Tests that agent is not marked as lost during loss-timeout."""
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
dict(msg=dict(operation='poll', agent_id='_lost'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2, expected_duration=STEP),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
dict(msg=dict(operation='reply',
agent_id=quorum_pkg.HEARTBEAT_AGENT),
reply=dict(operation='none'),
time=LOSS_TIMEOUT),
dict(msg=dict(operation='reply', agent_id='alpha'),
dict(msg=dict(operation='reply', agent_id='_lost'),
reply=dict(operation='none'),
time=LOSS_TIMEOUT),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
test_case = {
'alpha': DummyExecutor()
'_lost': DummyExecutor()
}
result = quorum.execute(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('ok', result['alpha']['status'])
self.assertEqual('ok', result['_lost']['status'])
def test_good_and_lost(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
dict(msg=dict(operation='poll', agent_id='_lost'),
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2, expected_duration=STEP),
time=1),
@@ -171,19 +170,20 @@ class TestQuorum(testtools.TestCase):
dict(msg=dict(operation='reply', agent_id='beta'),
reply=dict(operation='none'),
time=20),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
dict(msg=dict(operation='reply',
agent_id=quorum_pkg.HEARTBEAT_AGENT),
reply=dict(operation='none'),
time=STEP * 10),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
test_case = {
'alpha': DummyExecutor(),
'_lost': DummyExecutor(),
'beta': DummyExecutor(),
}
result = quorum.execute(test_case)
self.assertEqual(result.keys(), test_case.keys())
self.assertEqual('lost', result['alpha']['status'])
self.assertEqual('lost', result['_lost']['status'])
self.assertEqual('ok', result['beta']['status'])
def test_wait_agentexecutening_long_test(self):
@@ -193,7 +193,8 @@ class TestQuorum(testtools.TestCase):
reply=dict(operation='execute', command='RUN',
start_at=STEP * 2, expected_duration=STEP * 9),
time=1),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
dict(msg=dict(operation='reply',
agent_id=quorum_pkg.HEARTBEAT_AGENT),
reply=dict(operation='none'),
time=STEP * 4),
dict(msg=dict(operation='reply', agent_id='alpha'),
@@ -246,7 +247,8 @@ class TestQuorum(testtools.TestCase):
dict(msg=dict(operation='reply', agent_id='alpha'),
reply=dict(operation='none'),
time=STEP * 2),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
dict(msg=dict(operation='poll',
agent_id=quorum_pkg.HEARTBEAT_AGENT),
reply=dict(operation='none'),
time=STEP * 2),
]
@@ -260,17 +262,18 @@ class TestQuorum(testtools.TestCase):
def test_join_failed(self):
self.mock_time.return_value = 0
event_stream = [
dict(msg=dict(operation='poll', agent_id='alpha'),
dict(msg=dict(operation='poll', agent_id='_lost'),
reply=dict(operation='configure', polling_interval=STEP,
expected_duration=0),
time=STEP),
dict(msg=dict(operation='reply', agent_id='heartbeat'),
dict(msg=dict(operation='reply',
agent_id=quorum_pkg.HEARTBEAT_AGENT),
reply=dict(operation='none'),
time=JOIN_TIMEOUT + STEP * 2),
]
quorum = make_quorum(self._message_queue_gen(event_stream))
result = quorum.join(['alpha'])
result = quorum.join(['_lost'])
lost = [agent_id for agent_id, r in result.items()
if r['status'] == 'lost']
self.assertEqual(['alpha'], lost)
self.assertEqual(['_lost'], lost)