From 29f211122e9ed760a6247a2dd2e8f4f1f56105a7 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Tue, 7 Apr 2015 16:25:13 +0300 Subject: [PATCH] Move Quorum into a separate file Also move reading of scenario into utils Change-Id: I3370e629eb074850c3f0c436e6542fae2ad8e537 --- shaker/engine/quorum.py | 154 ++++++++++++++++++++++++++++++++++++++++ shaker/engine/server.py | 153 ++------------------------------------- shaker/engine/utils.py | 11 +++ shaker/lib.py | 7 +- tests/test_quorum.py | 4 +- 5 files changed, 177 insertions(+), 152 deletions(-) create mode 100644 shaker/engine/quorum.py diff --git a/shaker/engine/quorum.py b/shaker/engine/quorum.py new file mode 100644 index 0000000..46418da --- /dev/null +++ b/shaker/engine/quorum.py @@ -0,0 +1,154 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +from oslo_log import log as logging + + +LOG = logging.getLogger(__name__) + + +class BaseOperation(object): + def get_agent_join_timeout(self): + return 0 + + def get_active_agent_ids(self): + pass + + def get_reply(self, agent_id, start_at): + return {} + + def process_reply(self, agent_id, message): + return {'status': 'ok'} + + def process_failure(self, agent_id): + return {'status': 'lost'} + + +class JoinOperation(BaseOperation): + def __init__(self, agent_ids, polling_interval, agent_join_timeout): + super(JoinOperation, self).__init__() + self.agent_ids = agent_ids + self.polling_interval = polling_interval + self.agent_join_timeout = agent_join_timeout + + def get_agent_join_timeout(self): + return self.agent_join_timeout + + def get_active_agent_ids(self): + return set(self.agent_ids) + + def get_reply(self, agent_id, start_at): + return dict(operation='configure', + polling_interval=self.polling_interval, + expected_duration=0) + + +class ExecuteOperation(BaseOperation): + def __init__(self, executors): + super(ExecuteOperation, self).__init__() + self.executors = executors + + def get_active_agent_ids(self): + return set(self.executors.keys()) + + def get_reply(self, agent_id, start_at): + reply = dict(operation='execute', + start_at=start_at, + command=self.executors[agent_id].get_command(), + expected_duration=(self.executors[agent_id]. + get_expected_duration())) + return reply + + def process_reply(self, agent_id, message): + r = super(ExecuteOperation, self).process_reply(agent_id, message) + r.update(self.executors[agent_id].process_reply(message)) + return r + + def process_failure(self, agent_id): + r = super(ExecuteOperation, self).process_failure(agent_id) + r.update(self.executors[agent_id].process_failure()) + return r + + +class Quorum(object): + def __init__(self, message_queue, polling_interval, agent_loss_timeout, + agent_join_timeout): + self.message_queue = message_queue + self.polling_interval = polling_interval + self.agent_loss_timeout = agent_loss_timeout + self.agent_join_timeout = agent_join_timeout + + def _run(self, operation): + current = operation.get_active_agent_ids() + LOG.debug('Executing operation %s on agents: %s', operation, current) + + working = set() + replied = set() + result = {} + + start_at = time.time() + self.polling_interval * 2 + lives = dict((agent_id, start_at + operation.get_agent_join_timeout()) + for agent_id in current) + + for message, reply_handler in self.message_queue: + agent_id = message.get('agent_id') + op = message.get('operation') + reply = {'operation': 'none'} + now = time.time() + + if agent_id in (current - replied): + # message from a known not yet worked agent + lives[agent_id] = (now + self.polling_interval * 2 + + self.agent_loss_timeout) + + if op == 'poll': + reply = operation.get_reply(agent_id, start_at) + lives[agent_id] += reply.get('expected_duration') + working.add(agent_id) + LOG.debug('Working agents: %s', working) + elif op == 'reply': + if agent_id in working: + result[agent_id] = operation.process_reply( + agent_id, message) + replied.add(agent_id) + LOG.debug('Replied agents: %s', replied) + + reply_handler(reply) + + lost = set(a for a, t in lives.items() if t < now) - replied + if lost: + LOG.debug('Lost agents: %s', lost) + + if replied | lost >= current: + if lost: + LOG.warning('Lost agents: %s', lost) + # update result with info about lost agents + for agent_id in lost: + result[agent_id] = operation.process_failure(agent_id) + + LOG.info('Finished processing operation: %s', operation) + break + + return result + + def join(self, agent_ids): + LOG.debug('Waiting for quorum of agents: %s', agent_ids) + return self._run(JoinOperation(agent_ids, self.polling_interval, + self.agent_join_timeout)) + + def execute(self, executors): + return self._run(ExecuteOperation(executors)) diff --git a/shaker/engine/server.py b/shaker/engine/server.py index 3fb412b..f11703b 100644 --- a/shaker/engine/server.py +++ b/shaker/engine/server.py @@ -16,17 +16,16 @@ import copy import json import os -import time import uuid from oslo_config import cfg from oslo_log import log as logging -import yaml from shaker.engine import config from shaker.engine import deploy from shaker.engine import executors as executors_classes from shaker.engine import messaging +from shaker.engine import quorum as quorum_pkg from shaker.engine import report from shaker.engine import utils @@ -34,147 +33,6 @@ from shaker.engine import utils LOG = logging.getLogger(__name__) -class BaseOperation(object): - def get_agent_join_timeout(self): - return 0 - - def get_active_agent_ids(self): - pass - - def get_reply(self, agent_id, start_at): - return {} - - def process_reply(self, agent_id, message): - return {'status': 'ok'} - - def process_failure(self, agent_id): - return {'status': 'lost'} - - -class JoinOperation(BaseOperation): - def __init__(self, agent_ids, polling_interval, agent_join_timeout): - super(JoinOperation, self).__init__() - self.agent_ids = agent_ids - self.polling_interval = polling_interval - self.agent_join_timeout = agent_join_timeout - - def get_agent_join_timeout(self): - return self.agent_join_timeout - - def get_active_agent_ids(self): - return set(self.agent_ids) - - def get_reply(self, agent_id, start_at): - return dict(operation='configure', - polling_interval=self.polling_interval, - expected_duration=0) - - -class ExecuteOperation(BaseOperation): - def __init__(self, executors): - super(ExecuteOperation, self).__init__() - self.executors = executors - - def get_active_agent_ids(self): - return set(self.executors.keys()) - - def get_reply(self, agent_id, start_at): - reply = dict(operation='execute', - start_at=start_at, - command=self.executors[agent_id].get_command(), - expected_duration=(self.executors[agent_id]. - get_expected_duration())) - return reply - - def process_reply(self, agent_id, message): - r = super(ExecuteOperation, self).process_reply(agent_id, message) - r.update(self.executors[agent_id].process_reply(message)) - return r - - def process_failure(self, agent_id): - r = super(ExecuteOperation, self).process_failure(agent_id) - r.update(self.executors[agent_id].process_failure()) - return r - - -class Quorum(object): - def __init__(self, message_queue, polling_interval, agent_loss_timeout, - agent_join_timeout): - self.message_queue = message_queue - self.polling_interval = polling_interval - self.agent_loss_timeout = agent_loss_timeout - self.agent_join_timeout = agent_join_timeout - - def _run(self, operation): - current = operation.get_active_agent_ids() - LOG.debug('Executing operation %s on agents: %s', operation, current) - - working = set() - replied = set() - result = {} - - start_at = time.time() + self.polling_interval * 2 - lives = dict((agent_id, start_at + operation.get_agent_join_timeout()) - for agent_id in current) - - for message, reply_handler in self.message_queue: - agent_id = message.get('agent_id') - op = message.get('operation') - reply = {'operation': 'none'} - now = time.time() - - if agent_id in (current - replied): - # message from a known not yet worked agent - lives[agent_id] = (now + self.polling_interval * 2 + - self.agent_loss_timeout) - - if op == 'poll': - reply = operation.get_reply(agent_id, start_at) - lives[agent_id] += reply.get('expected_duration') - working.add(agent_id) - LOG.debug('Working agents: %s', working) - elif op == 'reply': - if agent_id in working: - result[agent_id] = operation.process_reply( - agent_id, message) - replied.add(agent_id) - LOG.debug('Replied agents: %s', replied) - - reply_handler(reply) - - lost = set(a for a, t in lives.items() if t < now) - replied - if lost: - LOG.debug('Lost agents: %s', lost) - - if replied | lost >= current: - if lost: - LOG.warning('Lost agents: %s', lost) - # update result with info about lost agents - for agent_id in lost: - result[agent_id] = operation.process_failure(agent_id) - - LOG.info('Finished processing operation: %s', operation) - break - - return result - - def join(self, agent_ids): - LOG.debug('Waiting for quorum of agents: %s', agent_ids) - return self._run(JoinOperation(agent_ids, self.polling_interval, - self.agent_join_timeout)) - - def execute(self, executors): - return self._run(ExecuteOperation(executors)) - - -def read_scenario(): - scenario_raw = utils.read_file(cfg.CONF.scenario) - scenario = yaml.safe_load(scenario_raw) - scenario['file_name'] = cfg.CONF.scenario - LOG.debug('Scenario: %s', scenario) - return scenario - - def _extend_agents(agents_map): extended_agents = {} for agent in agents_map.values(): @@ -246,7 +104,8 @@ def main(): config.REPORT_OPTS ) - scenario = read_scenario() + scenario = utils.read_yaml_file(cfg.CONF.scenario) + scenario['file_name'] = cfg.CONF.scenario deployment = None agents = {} @@ -272,9 +131,9 @@ def main(): else: message_queue = messaging.MessageQueue(cfg.CONF.server_endpoint) - quorum = Quorum(message_queue, cfg.CONF.polling_interval, - cfg.CONF.agent_loss_timeout, - cfg.CONF.agent_join_timeout) + quorum = quorum_pkg.Quorum( + message_queue, cfg.CONF.polling_interval, + cfg.CONF.agent_loss_timeout, cfg.CONF.agent_join_timeout) quorum.join(set(agents.keys())) result = execute(quorum, scenario['execution'], agents) diff --git a/shaker/engine/utils.py b/shaker/engine/utils.py index bfb5c74..f57ac1a 100644 --- a/shaker/engine/utils.py +++ b/shaker/engine/utils.py @@ -23,6 +23,7 @@ import random from oslo_config import cfg from oslo_log import log as logging import six +import yaml LOG = logging.getLogger(__name__) @@ -106,6 +107,16 @@ def write_file(data, file_name, base_dir=''): fd.close() +def read_yaml_file(file_name): + raw = read_file(file_name) + try: + parsed = yaml.safe_load(raw) + return parsed + except Exception as e: + LOG.error('Failed to parse file %(file)s in YAML format: %(err)s', + dict(file=file_name, err=e)) + + def split_address(address): try: host, port = address.split(':') diff --git a/shaker/lib.py b/shaker/lib.py index 05ca25c..356e17d 100644 --- a/shaker/lib.py +++ b/shaker/lib.py @@ -16,6 +16,7 @@ from oslo_log import log as logging from shaker.engine import messaging +from shaker.engine import quorum from shaker.engine import server @@ -29,10 +30,10 @@ class Shaker(object): res = shaker.run_program('the-agent', 'ls -al') """ def __init__(self, server_endpoint, agent_ids, polling_interval=1, - agent_loss_timeout=60): + agent_loss_timeout=60, agent_join_timeout=600): message_queue = messaging.MessageQueue(server_endpoint) - self.quorum = server.Quorum(message_queue, polling_interval, - agent_loss_timeout) + self.quorum = quorum.Quorum(message_queue, polling_interval, + agent_loss_timeout, agent_join_timeout) self.quorum.join(agent_ids) def _run(self, agent_id, item): diff --git a/tests/test_quorum.py b/tests/test_quorum.py index b6a73af..6d4edda 100644 --- a/tests/test_quorum.py +++ b/tests/test_quorum.py @@ -19,14 +19,14 @@ import mock import testtools from shaker.engine.executors import base as base_executor -from shaker.engine import server +from shaker.engine import quorum STEP = 10 # polling interval LOSS_TIMEOUT = 60 JOIN_TIMEOUT = 600 -make_quorum = functools.partial(server.Quorum, polling_interval=STEP, +make_quorum = functools.partial(quorum.Quorum, polling_interval=STEP, agent_loss_timeout=LOSS_TIMEOUT, agent_join_timeout=JOIN_TIMEOUT)