f805eec080
Change-Id: I85d130f2f6b8e68e5a9e0177238c8f5c569ab6f1
280 lines
10 KiB
Python
280 lines
10 KiB
Python
# Copyright (c) 2015 Mirantis Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import functools
|
|
|
|
import mock
|
|
import testtools
|
|
|
|
from shaker.engine.executors import base as base_executor
|
|
from shaker.engine import quorum as quorum_pkg
|
|
|
|
|
|
STEP = 10 # polling interval
|
|
LOSS_TIMEOUT = 60
|
|
JOIN_TIMEOUT = 600
|
|
|
|
make_quorum = functools.partial(quorum_pkg.Quorum, polling_interval=STEP,
|
|
agent_loss_timeout=LOSS_TIMEOUT,
|
|
agent_join_timeout=JOIN_TIMEOUT)
|
|
|
|
|
|
class DummyExecutor(base_executor.BaseExecutor):
|
|
def __init__(self, duration=STEP):
|
|
super(DummyExecutor, self).__init__({}, None)
|
|
self.duration = duration
|
|
|
|
def get_expected_duration(self):
|
|
return self.duration
|
|
|
|
def process_reply(self, message):
|
|
return super(DummyExecutor, self).process_reply(message)
|
|
|
|
def get_command(self):
|
|
return 'RUN'
|
|
|
|
|
|
class TestQuorum(testtools.TestCase):
|
|
|
|
def setUp(self):
|
|
self.mock_time = mock.Mock()
|
|
self._mock_patch = mock.patch('time.time', self.mock_time)
|
|
self._mock_patch.start()
|
|
return super(TestQuorum, self).setUp()
|
|
|
|
def tearDown(self):
|
|
self._mock_patch.stop()
|
|
return super(TestQuorum, self).tearDown()
|
|
|
|
def _reply(self, expected):
|
|
def reply_handler(reply_message):
|
|
self.assertEqual(expected, reply_message)
|
|
|
|
return reply_handler
|
|
|
|
def _message_queue_gen(self, event_stream, fail_at_end=True):
|
|
for event in event_stream:
|
|
self.mock_time.return_value = event['time']
|
|
yield (event['msg'], self._reply(event['reply']))
|
|
|
|
def test_poll_reply(self):
|
|
self.mock_time.return_value = 0
|
|
event_stream = [
|
|
dict(msg=dict(operation='poll', agent_id='alpha'),
|
|
reply=dict(operation='execute', command='RUN',
|
|
start_at=STEP * 2, expected_duration=STEP),
|
|
time=1),
|
|
dict(msg=dict(operation='reply', agent_id='alpha'),
|
|
reply=dict(operation='none'),
|
|
time=20),
|
|
]
|
|
|
|
quorum = make_quorum(self._message_queue_gen(event_stream))
|
|
test_case = {
|
|
'alpha': DummyExecutor()
|
|
}
|
|
result = quorum.execute(test_case)
|
|
self.assertEqual(result.keys(), test_case.keys())
|
|
|
|
def test_poll_reply_unknown_agent_ignored(self):
|
|
self.mock_time.return_value = 0
|
|
event_stream = [
|
|
dict(msg=dict(operation='poll', agent_id='alpha'),
|
|
reply=dict(operation='execute', command='RUN',
|
|
start_at=STEP * 2, expected_duration=STEP),
|
|
time=1),
|
|
dict(msg=dict(operation='reply', agent_id='beta'),
|
|
reply=dict(operation='none'),
|
|
time=20),
|
|
dict(msg=dict(operation='reply', agent_id='alpha'),
|
|
reply=dict(operation='none'),
|
|
time=20),
|
|
]
|
|
|
|
quorum = make_quorum(self._message_queue_gen(event_stream))
|
|
test_case = {
|
|
'alpha': DummyExecutor()
|
|
}
|
|
result = quorum.execute(test_case)
|
|
self.assertEqual(result.keys(), test_case.keys())
|
|
|
|
def test_lost_agent(self):
|
|
self.mock_time.return_value = 0
|
|
event_stream = [
|
|
dict(msg=dict(operation='poll', agent_id='alpha'),
|
|
reply=dict(operation='execute', command='RUN',
|
|
start_at=STEP * 2, expected_duration=STEP),
|
|
time=1),
|
|
dict(msg=dict(operation='reply',
|
|
agent_id=quorum_pkg.HEARTBEAT_AGENT),
|
|
reply=dict(operation='none'),
|
|
time=STEP * 10),
|
|
]
|
|
|
|
quorum = make_quorum(self._message_queue_gen(event_stream))
|
|
test_case = {
|
|
'alpha': DummyExecutor()
|
|
}
|
|
result = quorum.execute(test_case)
|
|
self.assertEqual(result.keys(), test_case.keys())
|
|
self.assertEqual('lost', result['alpha']['status'])
|
|
|
|
def test_agent_loss_timeout(self):
|
|
"""Tests that agent is not marked as lost during loss-timeout."""
|
|
self.mock_time.return_value = 0
|
|
event_stream = [
|
|
dict(msg=dict(operation='poll', agent_id='_lost'),
|
|
reply=dict(operation='execute', command='RUN',
|
|
start_at=STEP * 2, expected_duration=STEP),
|
|
time=1),
|
|
dict(msg=dict(operation='reply',
|
|
agent_id=quorum_pkg.HEARTBEAT_AGENT),
|
|
reply=dict(operation='none'),
|
|
time=LOSS_TIMEOUT),
|
|
dict(msg=dict(operation='reply', agent_id='_lost'),
|
|
reply=dict(operation='none'),
|
|
time=LOSS_TIMEOUT),
|
|
]
|
|
|
|
quorum = make_quorum(self._message_queue_gen(event_stream))
|
|
test_case = {
|
|
'_lost': DummyExecutor()
|
|
}
|
|
result = quorum.execute(test_case)
|
|
self.assertEqual(result.keys(), test_case.keys())
|
|
self.assertEqual('ok', result['_lost']['status'])
|
|
|
|
def test_good_and_lost(self):
|
|
self.mock_time.return_value = 0
|
|
event_stream = [
|
|
dict(msg=dict(operation='poll', agent_id='_lost'),
|
|
reply=dict(operation='execute', command='RUN',
|
|
start_at=STEP * 2, expected_duration=STEP),
|
|
time=1),
|
|
dict(msg=dict(operation='poll', agent_id='beta'),
|
|
reply=dict(operation='execute', command='RUN',
|
|
start_at=STEP * 2, expected_duration=STEP),
|
|
time=2),
|
|
dict(msg=dict(operation='reply', agent_id='beta'),
|
|
reply=dict(operation='none'),
|
|
time=20),
|
|
dict(msg=dict(operation='reply',
|
|
agent_id=quorum_pkg.HEARTBEAT_AGENT),
|
|
reply=dict(operation='none'),
|
|
time=STEP * 10),
|
|
]
|
|
|
|
quorum = make_quorum(self._message_queue_gen(event_stream))
|
|
test_case = {
|
|
'_lost': DummyExecutor(),
|
|
'beta': DummyExecutor(),
|
|
}
|
|
result = quorum.execute(test_case)
|
|
self.assertEqual(set(result.keys()), set(test_case.keys()))
|
|
self.assertEqual('lost', result['_lost']['status'])
|
|
self.assertEqual('ok', result['beta']['status'])
|
|
|
|
def test_wait_agentexecutening_long_test(self):
|
|
self.mock_time.return_value = 0
|
|
event_stream = [
|
|
dict(msg=dict(operation='poll', agent_id='alpha'),
|
|
reply=dict(operation='execute', command='RUN',
|
|
start_at=STEP * 2, expected_duration=STEP * 9),
|
|
time=1),
|
|
dict(msg=dict(operation='reply',
|
|
agent_id=quorum_pkg.HEARTBEAT_AGENT),
|
|
reply=dict(operation='none'),
|
|
time=STEP * 4),
|
|
dict(msg=dict(operation='reply', agent_id='alpha'),
|
|
reply=dict(operation='none'),
|
|
time=STEP * 9),
|
|
]
|
|
|
|
quorum = make_quorum(self._message_queue_gen(event_stream))
|
|
test_case = {
|
|
'alpha': DummyExecutor(duration=STEP * 9)
|
|
}
|
|
result = quorum.execute(test_case)
|
|
self.assertEqual(result.keys(), test_case.keys())
|
|
self.assertEqual('ok', result['alpha']['status'])
|
|
|
|
def test_good_and_interrupted(self):
|
|
self.mock_time.return_value = 0
|
|
event_stream = [
|
|
dict(msg=dict(operation='poll', agent_id='alpha'),
|
|
reply=dict(operation='execute', command='RUN',
|
|
start_at=STEP * 2, expected_duration=STEP),
|
|
time=1),
|
|
dict(msg=dict(operation='poll', agent_id='beta'),
|
|
reply=dict(operation='execute', command='RUN',
|
|
start_at=STEP * 2, expected_duration=STEP),
|
|
time=2),
|
|
dict(msg=dict(operation='reply', agent_id='beta'),
|
|
reply=dict(operation='none'),
|
|
time=20),
|
|
]
|
|
|
|
quorum = make_quorum(self._message_queue_gen(event_stream,
|
|
fail_at_end=False))
|
|
test_case = {
|
|
'alpha': DummyExecutor(),
|
|
'beta': DummyExecutor(),
|
|
}
|
|
result = quorum.execute(test_case)
|
|
self.assertEqual(result.keys(), test_case.keys())
|
|
self.assertEqual('interrupted', result['alpha']['status'])
|
|
self.assertEqual('ok', result['beta']['status'])
|
|
|
|
def test_join_succeed(self):
|
|
self.mock_time.return_value = 0
|
|
event_stream = [
|
|
dict(msg=dict(operation='poll', agent_id='alpha'),
|
|
reply=dict(operation='configure', polling_interval=STEP,
|
|
expected_duration=0),
|
|
time=STEP),
|
|
dict(msg=dict(operation='reply', agent_id='alpha'),
|
|
reply=dict(operation='none'),
|
|
time=STEP * 2),
|
|
dict(msg=dict(operation='poll',
|
|
agent_id=quorum_pkg.HEARTBEAT_AGENT),
|
|
reply=dict(operation='none'),
|
|
time=STEP * 2),
|
|
]
|
|
|
|
quorum = make_quorum(self._message_queue_gen(event_stream))
|
|
result = quorum.join(['alpha'])
|
|
lost = [agent_id for agent_id, r in result.items()
|
|
if r['status'] == 'lost']
|
|
self.assertEqual([], lost)
|
|
|
|
def test_join_failed(self):
|
|
self.mock_time.return_value = 0
|
|
event_stream = [
|
|
dict(msg=dict(operation='poll', agent_id='_lost'),
|
|
reply=dict(operation='configure', polling_interval=STEP,
|
|
expected_duration=0),
|
|
time=STEP),
|
|
dict(msg=dict(operation='reply',
|
|
agent_id=quorum_pkg.HEARTBEAT_AGENT),
|
|
reply=dict(operation='none'),
|
|
time=JOIN_TIMEOUT + STEP * 2),
|
|
]
|
|
|
|
quorum = make_quorum(self._message_queue_gen(event_stream))
|
|
result = quorum.join(['_lost'])
|
|
lost = [agent_id for agent_id, r in result.items()
|
|
if r['status'] == 'lost']
|
|
self.assertEqual(['_lost'], lost)
|