Move Quorum into a separate file
Also move reading of scenario into utils Change-Id: I3370e629eb074850c3f0c436e6542fae2ad8e537
This commit is contained in:
parent
1a5108fb3c
commit
29f211122e
154
shaker/engine/quorum.py
Normal file
154
shaker/engine/quorum.py
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
# Copyright (c) 2015 Mirantis Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
from oslo_log import log as logging
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class BaseOperation(object):
|
||||||
|
def get_agent_join_timeout(self):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def get_active_agent_ids(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_reply(self, agent_id, start_at):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def process_reply(self, agent_id, message):
|
||||||
|
return {'status': 'ok'}
|
||||||
|
|
||||||
|
def process_failure(self, agent_id):
|
||||||
|
return {'status': 'lost'}
|
||||||
|
|
||||||
|
|
||||||
|
class JoinOperation(BaseOperation):
|
||||||
|
def __init__(self, agent_ids, polling_interval, agent_join_timeout):
|
||||||
|
super(JoinOperation, self).__init__()
|
||||||
|
self.agent_ids = agent_ids
|
||||||
|
self.polling_interval = polling_interval
|
||||||
|
self.agent_join_timeout = agent_join_timeout
|
||||||
|
|
||||||
|
def get_agent_join_timeout(self):
|
||||||
|
return self.agent_join_timeout
|
||||||
|
|
||||||
|
def get_active_agent_ids(self):
|
||||||
|
return set(self.agent_ids)
|
||||||
|
|
||||||
|
def get_reply(self, agent_id, start_at):
|
||||||
|
return dict(operation='configure',
|
||||||
|
polling_interval=self.polling_interval,
|
||||||
|
expected_duration=0)
|
||||||
|
|
||||||
|
|
||||||
|
class ExecuteOperation(BaseOperation):
|
||||||
|
def __init__(self, executors):
|
||||||
|
super(ExecuteOperation, self).__init__()
|
||||||
|
self.executors = executors
|
||||||
|
|
||||||
|
def get_active_agent_ids(self):
|
||||||
|
return set(self.executors.keys())
|
||||||
|
|
||||||
|
def get_reply(self, agent_id, start_at):
|
||||||
|
reply = dict(operation='execute',
|
||||||
|
start_at=start_at,
|
||||||
|
command=self.executors[agent_id].get_command(),
|
||||||
|
expected_duration=(self.executors[agent_id].
|
||||||
|
get_expected_duration()))
|
||||||
|
return reply
|
||||||
|
|
||||||
|
def process_reply(self, agent_id, message):
|
||||||
|
r = super(ExecuteOperation, self).process_reply(agent_id, message)
|
||||||
|
r.update(self.executors[agent_id].process_reply(message))
|
||||||
|
return r
|
||||||
|
|
||||||
|
def process_failure(self, agent_id):
|
||||||
|
r = super(ExecuteOperation, self).process_failure(agent_id)
|
||||||
|
r.update(self.executors[agent_id].process_failure())
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
class Quorum(object):
|
||||||
|
def __init__(self, message_queue, polling_interval, agent_loss_timeout,
|
||||||
|
agent_join_timeout):
|
||||||
|
self.message_queue = message_queue
|
||||||
|
self.polling_interval = polling_interval
|
||||||
|
self.agent_loss_timeout = agent_loss_timeout
|
||||||
|
self.agent_join_timeout = agent_join_timeout
|
||||||
|
|
||||||
|
def _run(self, operation):
|
||||||
|
current = operation.get_active_agent_ids()
|
||||||
|
LOG.debug('Executing operation %s on agents: %s', operation, current)
|
||||||
|
|
||||||
|
working = set()
|
||||||
|
replied = set()
|
||||||
|
result = {}
|
||||||
|
|
||||||
|
start_at = time.time() + self.polling_interval * 2
|
||||||
|
lives = dict((agent_id, start_at + operation.get_agent_join_timeout())
|
||||||
|
for agent_id in current)
|
||||||
|
|
||||||
|
for message, reply_handler in self.message_queue:
|
||||||
|
agent_id = message.get('agent_id')
|
||||||
|
op = message.get('operation')
|
||||||
|
reply = {'operation': 'none'}
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
if agent_id in (current - replied):
|
||||||
|
# message from a known not yet worked agent
|
||||||
|
lives[agent_id] = (now + self.polling_interval * 2 +
|
||||||
|
self.agent_loss_timeout)
|
||||||
|
|
||||||
|
if op == 'poll':
|
||||||
|
reply = operation.get_reply(agent_id, start_at)
|
||||||
|
lives[agent_id] += reply.get('expected_duration')
|
||||||
|
working.add(agent_id)
|
||||||
|
LOG.debug('Working agents: %s', working)
|
||||||
|
elif op == 'reply':
|
||||||
|
if agent_id in working:
|
||||||
|
result[agent_id] = operation.process_reply(
|
||||||
|
agent_id, message)
|
||||||
|
replied.add(agent_id)
|
||||||
|
LOG.debug('Replied agents: %s', replied)
|
||||||
|
|
||||||
|
reply_handler(reply)
|
||||||
|
|
||||||
|
lost = set(a for a, t in lives.items() if t < now) - replied
|
||||||
|
if lost:
|
||||||
|
LOG.debug('Lost agents: %s', lost)
|
||||||
|
|
||||||
|
if replied | lost >= current:
|
||||||
|
if lost:
|
||||||
|
LOG.warning('Lost agents: %s', lost)
|
||||||
|
# update result with info about lost agents
|
||||||
|
for agent_id in lost:
|
||||||
|
result[agent_id] = operation.process_failure(agent_id)
|
||||||
|
|
||||||
|
LOG.info('Finished processing operation: %s', operation)
|
||||||
|
break
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def join(self, agent_ids):
|
||||||
|
LOG.debug('Waiting for quorum of agents: %s', agent_ids)
|
||||||
|
return self._run(JoinOperation(agent_ids, self.polling_interval,
|
||||||
|
self.agent_join_timeout))
|
||||||
|
|
||||||
|
def execute(self, executors):
|
||||||
|
return self._run(ExecuteOperation(executors))
|
@ -16,17 +16,16 @@
|
|||||||
import copy
|
import copy
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import time
|
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import yaml
|
|
||||||
|
|
||||||
from shaker.engine import config
|
from shaker.engine import config
|
||||||
from shaker.engine import deploy
|
from shaker.engine import deploy
|
||||||
from shaker.engine import executors as executors_classes
|
from shaker.engine import executors as executors_classes
|
||||||
from shaker.engine import messaging
|
from shaker.engine import messaging
|
||||||
|
from shaker.engine import quorum as quorum_pkg
|
||||||
from shaker.engine import report
|
from shaker.engine import report
|
||||||
from shaker.engine import utils
|
from shaker.engine import utils
|
||||||
|
|
||||||
@ -34,147 +33,6 @@ from shaker.engine import utils
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class BaseOperation(object):
|
|
||||||
def get_agent_join_timeout(self):
|
|
||||||
return 0
|
|
||||||
|
|
||||||
def get_active_agent_ids(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_reply(self, agent_id, start_at):
|
|
||||||
return {}
|
|
||||||
|
|
||||||
def process_reply(self, agent_id, message):
|
|
||||||
return {'status': 'ok'}
|
|
||||||
|
|
||||||
def process_failure(self, agent_id):
|
|
||||||
return {'status': 'lost'}
|
|
||||||
|
|
||||||
|
|
||||||
class JoinOperation(BaseOperation):
|
|
||||||
def __init__(self, agent_ids, polling_interval, agent_join_timeout):
|
|
||||||
super(JoinOperation, self).__init__()
|
|
||||||
self.agent_ids = agent_ids
|
|
||||||
self.polling_interval = polling_interval
|
|
||||||
self.agent_join_timeout = agent_join_timeout
|
|
||||||
|
|
||||||
def get_agent_join_timeout(self):
|
|
||||||
return self.agent_join_timeout
|
|
||||||
|
|
||||||
def get_active_agent_ids(self):
|
|
||||||
return set(self.agent_ids)
|
|
||||||
|
|
||||||
def get_reply(self, agent_id, start_at):
|
|
||||||
return dict(operation='configure',
|
|
||||||
polling_interval=self.polling_interval,
|
|
||||||
expected_duration=0)
|
|
||||||
|
|
||||||
|
|
||||||
class ExecuteOperation(BaseOperation):
|
|
||||||
def __init__(self, executors):
|
|
||||||
super(ExecuteOperation, self).__init__()
|
|
||||||
self.executors = executors
|
|
||||||
|
|
||||||
def get_active_agent_ids(self):
|
|
||||||
return set(self.executors.keys())
|
|
||||||
|
|
||||||
def get_reply(self, agent_id, start_at):
|
|
||||||
reply = dict(operation='execute',
|
|
||||||
start_at=start_at,
|
|
||||||
command=self.executors[agent_id].get_command(),
|
|
||||||
expected_duration=(self.executors[agent_id].
|
|
||||||
get_expected_duration()))
|
|
||||||
return reply
|
|
||||||
|
|
||||||
def process_reply(self, agent_id, message):
|
|
||||||
r = super(ExecuteOperation, self).process_reply(agent_id, message)
|
|
||||||
r.update(self.executors[agent_id].process_reply(message))
|
|
||||||
return r
|
|
||||||
|
|
||||||
def process_failure(self, agent_id):
|
|
||||||
r = super(ExecuteOperation, self).process_failure(agent_id)
|
|
||||||
r.update(self.executors[agent_id].process_failure())
|
|
||||||
return r
|
|
||||||
|
|
||||||
|
|
||||||
class Quorum(object):
|
|
||||||
def __init__(self, message_queue, polling_interval, agent_loss_timeout,
|
|
||||||
agent_join_timeout):
|
|
||||||
self.message_queue = message_queue
|
|
||||||
self.polling_interval = polling_interval
|
|
||||||
self.agent_loss_timeout = agent_loss_timeout
|
|
||||||
self.agent_join_timeout = agent_join_timeout
|
|
||||||
|
|
||||||
def _run(self, operation):
|
|
||||||
current = operation.get_active_agent_ids()
|
|
||||||
LOG.debug('Executing operation %s on agents: %s', operation, current)
|
|
||||||
|
|
||||||
working = set()
|
|
||||||
replied = set()
|
|
||||||
result = {}
|
|
||||||
|
|
||||||
start_at = time.time() + self.polling_interval * 2
|
|
||||||
lives = dict((agent_id, start_at + operation.get_agent_join_timeout())
|
|
||||||
for agent_id in current)
|
|
||||||
|
|
||||||
for message, reply_handler in self.message_queue:
|
|
||||||
agent_id = message.get('agent_id')
|
|
||||||
op = message.get('operation')
|
|
||||||
reply = {'operation': 'none'}
|
|
||||||
now = time.time()
|
|
||||||
|
|
||||||
if agent_id in (current - replied):
|
|
||||||
# message from a known not yet worked agent
|
|
||||||
lives[agent_id] = (now + self.polling_interval * 2 +
|
|
||||||
self.agent_loss_timeout)
|
|
||||||
|
|
||||||
if op == 'poll':
|
|
||||||
reply = operation.get_reply(agent_id, start_at)
|
|
||||||
lives[agent_id] += reply.get('expected_duration')
|
|
||||||
working.add(agent_id)
|
|
||||||
LOG.debug('Working agents: %s', working)
|
|
||||||
elif op == 'reply':
|
|
||||||
if agent_id in working:
|
|
||||||
result[agent_id] = operation.process_reply(
|
|
||||||
agent_id, message)
|
|
||||||
replied.add(agent_id)
|
|
||||||
LOG.debug('Replied agents: %s', replied)
|
|
||||||
|
|
||||||
reply_handler(reply)
|
|
||||||
|
|
||||||
lost = set(a for a, t in lives.items() if t < now) - replied
|
|
||||||
if lost:
|
|
||||||
LOG.debug('Lost agents: %s', lost)
|
|
||||||
|
|
||||||
if replied | lost >= current:
|
|
||||||
if lost:
|
|
||||||
LOG.warning('Lost agents: %s', lost)
|
|
||||||
# update result with info about lost agents
|
|
||||||
for agent_id in lost:
|
|
||||||
result[agent_id] = operation.process_failure(agent_id)
|
|
||||||
|
|
||||||
LOG.info('Finished processing operation: %s', operation)
|
|
||||||
break
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
def join(self, agent_ids):
|
|
||||||
LOG.debug('Waiting for quorum of agents: %s', agent_ids)
|
|
||||||
return self._run(JoinOperation(agent_ids, self.polling_interval,
|
|
||||||
self.agent_join_timeout))
|
|
||||||
|
|
||||||
def execute(self, executors):
|
|
||||||
return self._run(ExecuteOperation(executors))
|
|
||||||
|
|
||||||
|
|
||||||
def read_scenario():
|
|
||||||
scenario_raw = utils.read_file(cfg.CONF.scenario)
|
|
||||||
scenario = yaml.safe_load(scenario_raw)
|
|
||||||
scenario['file_name'] = cfg.CONF.scenario
|
|
||||||
LOG.debug('Scenario: %s', scenario)
|
|
||||||
return scenario
|
|
||||||
|
|
||||||
|
|
||||||
def _extend_agents(agents_map):
|
def _extend_agents(agents_map):
|
||||||
extended_agents = {}
|
extended_agents = {}
|
||||||
for agent in agents_map.values():
|
for agent in agents_map.values():
|
||||||
@ -246,7 +104,8 @@ def main():
|
|||||||
config.REPORT_OPTS
|
config.REPORT_OPTS
|
||||||
)
|
)
|
||||||
|
|
||||||
scenario = read_scenario()
|
scenario = utils.read_yaml_file(cfg.CONF.scenario)
|
||||||
|
scenario['file_name'] = cfg.CONF.scenario
|
||||||
|
|
||||||
deployment = None
|
deployment = None
|
||||||
agents = {}
|
agents = {}
|
||||||
@ -272,9 +131,9 @@ def main():
|
|||||||
else:
|
else:
|
||||||
message_queue = messaging.MessageQueue(cfg.CONF.server_endpoint)
|
message_queue = messaging.MessageQueue(cfg.CONF.server_endpoint)
|
||||||
|
|
||||||
quorum = Quorum(message_queue, cfg.CONF.polling_interval,
|
quorum = quorum_pkg.Quorum(
|
||||||
cfg.CONF.agent_loss_timeout,
|
message_queue, cfg.CONF.polling_interval,
|
||||||
cfg.CONF.agent_join_timeout)
|
cfg.CONF.agent_loss_timeout, cfg.CONF.agent_join_timeout)
|
||||||
quorum.join(set(agents.keys()))
|
quorum.join(set(agents.keys()))
|
||||||
|
|
||||||
result = execute(quorum, scenario['execution'], agents)
|
result = execute(quorum, scenario['execution'], agents)
|
||||||
|
@ -23,6 +23,7 @@ import random
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import six
|
import six
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -106,6 +107,16 @@ def write_file(data, file_name, base_dir=''):
|
|||||||
fd.close()
|
fd.close()
|
||||||
|
|
||||||
|
|
||||||
|
def read_yaml_file(file_name):
|
||||||
|
raw = read_file(file_name)
|
||||||
|
try:
|
||||||
|
parsed = yaml.safe_load(raw)
|
||||||
|
return parsed
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error('Failed to parse file %(file)s in YAML format: %(err)s',
|
||||||
|
dict(file=file_name, err=e))
|
||||||
|
|
||||||
|
|
||||||
def split_address(address):
|
def split_address(address):
|
||||||
try:
|
try:
|
||||||
host, port = address.split(':')
|
host, port = address.split(':')
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
|
||||||
from shaker.engine import messaging
|
from shaker.engine import messaging
|
||||||
|
from shaker.engine import quorum
|
||||||
from shaker.engine import server
|
from shaker.engine import server
|
||||||
|
|
||||||
|
|
||||||
@ -29,10 +30,10 @@ class Shaker(object):
|
|||||||
res = shaker.run_program('the-agent', 'ls -al')
|
res = shaker.run_program('the-agent', 'ls -al')
|
||||||
"""
|
"""
|
||||||
def __init__(self, server_endpoint, agent_ids, polling_interval=1,
|
def __init__(self, server_endpoint, agent_ids, polling_interval=1,
|
||||||
agent_loss_timeout=60):
|
agent_loss_timeout=60, agent_join_timeout=600):
|
||||||
message_queue = messaging.MessageQueue(server_endpoint)
|
message_queue = messaging.MessageQueue(server_endpoint)
|
||||||
self.quorum = server.Quorum(message_queue, polling_interval,
|
self.quorum = quorum.Quorum(message_queue, polling_interval,
|
||||||
agent_loss_timeout)
|
agent_loss_timeout, agent_join_timeout)
|
||||||
self.quorum.join(agent_ids)
|
self.quorum.join(agent_ids)
|
||||||
|
|
||||||
def _run(self, agent_id, item):
|
def _run(self, agent_id, item):
|
||||||
|
@ -19,14 +19,14 @@ import mock
|
|||||||
import testtools
|
import testtools
|
||||||
|
|
||||||
from shaker.engine.executors import base as base_executor
|
from shaker.engine.executors import base as base_executor
|
||||||
from shaker.engine import server
|
from shaker.engine import quorum
|
||||||
|
|
||||||
|
|
||||||
STEP = 10 # polling interval
|
STEP = 10 # polling interval
|
||||||
LOSS_TIMEOUT = 60
|
LOSS_TIMEOUT = 60
|
||||||
JOIN_TIMEOUT = 600
|
JOIN_TIMEOUT = 600
|
||||||
|
|
||||||
make_quorum = functools.partial(server.Quorum, polling_interval=STEP,
|
make_quorum = functools.partial(quorum.Quorum, polling_interval=STEP,
|
||||||
agent_loss_timeout=LOSS_TIMEOUT,
|
agent_loss_timeout=LOSS_TIMEOUT,
|
||||||
agent_join_timeout=JOIN_TIMEOUT)
|
agent_join_timeout=JOIN_TIMEOUT)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user