From ad70d89847297edcfcb21b7e88a0d16bea67a520 Mon Sep 17 00:00:00 2001 From: Russell Haering <russell.haering@rackspace.com> Date: Wed, 15 Jan 2014 15:22:12 -0800 Subject: [PATCH] split out base and agent modules --- teeth_agent/agent.py | 251 +++++++++++++++++++++++++++++++++++++++++++ teeth_agent/base.py | 224 -------------------------------------- 2 files changed, 251 insertions(+), 224 deletions(-) create mode 100644 teeth_agent/agent.py diff --git a/teeth_agent/agent.py b/teeth_agent/agent.py new file mode 100644 index 000000000..ce19adcc7 --- /dev/null +++ b/teeth_agent/agent.py @@ -0,0 +1,251 @@ +""" +Copyright 2013 Rackspace, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import collections +import random +import socket +import threading +import time +import urlparse + +from cherrypy import wsgiserver +import pkg_resources +from stevedore import driver +import structlog +from teeth_rest import encoding +from teeth_rest import errors as rest_errors + +from teeth_agent import api +from teeth_agent import base +from teeth_agent import errors +from teeth_agent import hardware +from teeth_agent import overlord_agent_api + + +class TeethAgentStatus(encoding.Serializable): + def __init__(self, mode, started_at, version): + self.mode = mode + self.started_at = started_at + self.version = version + + def serialize(self, view): + """Turn the status into a dict.""" + return collections.OrderedDict([ + ('mode', self.mode), + ('started_at', self.started_at), + ('version', self.version), + ]) + + +class TeethAgentHeartbeater(threading.Thread): + # If we could wait at most N seconds between heartbeats (or in case of an + # error) we will instead wait r x N seconds, where r is a random value + # between these multipliers. + min_jitter_multiplier = 0.3 + max_jitter_multiplier = 0.6 + + # Exponential backoff values used in case of an error. In reality we will + # only wait a portion of either of these delays based on the jitter + # multipliers. + initial_delay = 1.0 + max_delay = 300.0 + backoff_factor = 2.7 + + def __init__(self, agent): + super(TeethAgentHeartbeater, self).__init__() + self.agent = agent + self.api = overlord_agent_api.APIClient(agent.api_url) + self.log = structlog.get_logger(api_url=agent.api_url) + self.stop_event = threading.Event() + self.error_delay = self.initial_delay + + def run(self): + # The first heartbeat happens now + self.log.info('starting heartbeater') + interval = 0 + + while not self.stop_event.wait(interval): + next_heartbeat_by = self.do_heartbeat() + interval_multiplier = random.uniform(self.min_jitter_multiplier, + self.max_jitter_multiplier) + interval = (next_heartbeat_by - time.time()) * interval_multiplier + self.log.info('sleeping before next heartbeat', interval=interval) + + def do_heartbeat(self): + try: + deadline = self.api.heartbeat( + mac_addr=self.agent.get_agent_mac_addr(), + url=self.agent.get_agent_url(), + version=self.agent.version, + mode=self.agent.mode_implementation.name) + self.error_delay = self.initial_delay + self.log.info('heartbeat successful') + except Exception as e: + self.log.error('error sending heartbeat', exception=e) + deadline = time.time() + self.error_delay + self.error_delay = min(self.error_delay * self.backoff_factor, + self.max_delay) + pass + + return deadline + + def stop(self): + self.log.info('stopping heartbeater') + self.stop_event.set() + return self.join() + + +class TeethAgent(object): + def __init__(self, api_url, listen_address, advertise_address, mode_impl): + self.api_url = api_url + self.listen_address = listen_address + self.advertise_address = advertise_address + self.mode_implementation = mode_impl + self.version = pkg_resources.get_distribution('teeth-agent').version + self.api = api.TeethAgentAPIServer(self) + self.command_results = collections.OrderedDict() + self.heartbeater = TeethAgentHeartbeater(self) + self.hardware = hardware.HardwareInspector() + self.command_lock = threading.Lock() + self.log = structlog.get_logger() + self.started_at = None + + def get_status(self): + """Retrieve a serializable status.""" + return TeethAgentStatus( + mode=self.mode_implementation.name, + started_at=self.started_at, + version=self.version + ) + + def get_agent_url(self): + # If we put this behind any sort of proxy (ie, stunnel) we're going to + # need to (re)think this. + return 'http://{host}:{port}/'.format(host=self.advertise_address[0], + port=self.advertise_address[1]) + + def get_agent_mac_addr(self): + return self.hardware.get_primary_mac_address() + + def list_command_results(self): + return self.command_results.values() + + def get_command_result(self, result_id): + try: + return self.command_results[result_id] + except KeyError: + raise errors.RequestedObjectNotFoundError('Command Result', + result_id) + + def execute_command(self, command_name, **kwargs): + """Execute an agent command.""" + with self.command_lock: + if len(self.command_results) > 0: + last_command = self.command_results.values()[-1] + if not last_command.is_done(): + raise errors.CommandExecutionError('agent is busy') + + if command_name not in self.mode_implementation: + raise errors.InvalidCommandError(command_name) + + try: + command_fn = self.mode_implementation[command_name] + result = command_fn(command_name, **kwargs) + if not isinstance(result, base.BaseCommandResult): + result = base.SyncCommandResult(command_name, + kwargs, + True, + result) + except rest_errors.InvalidContentError as e: + # Any command may raise a InvalidContentError which will be + # returned to the caller directly. + raise e + except Exception as e: + # Other errors are considered command execution errors, and are + # recorded as an + result = base.SyncCommandResult(command_name, kwargs, False, e) + + self.command_results[result.id] = result + return result + + def run(self): + """Run the Teeth Agent.""" + self.started_at = time.time() + self.heartbeater.start() + server = wsgiserver.CherryPyWSGIServer(self.listen_address, self.api) + + try: + server.start() + except BaseException as e: + self.log.error('shutting down', exception=e) + server.stop() + + self.heartbeater.stop() + + +def _get_api_facing_ip_address(api_url): + """Note: this will raise an exception if anything goes wrong. That is + expected to be fine, if we can't get to the agent API there isn't much + point in starting up. Just crash and rely on the process manager to + restart us in a sane fashion. + """ + api_addr = urlparse.urlparse(api_url) + + if api_addr.scheme not in ('http', 'https'): + raise RuntimeError('API URL scheme must be one of \'http\' or ' + '\'https\'.') + + api_port = api_addr.port or {'http': 80, 'https': 443}[api_addr.scheme] + api_host = api_addr.hostname + + conn = socket.create_connection((api_host, api_port)) + listen_ip = conn.getsockname()[0] + conn.close() + + return listen_ip + + +def _load_mode_implementation(mode_name): + mgr = driver.DriverManager( + namespace='teeth_agent.modes', + name=mode_name, + invoke_on_load=True, + invoke_args=[], + ) + return mgr.driver + + +def build_agent(api_url, + listen_host, + listen_port, + advertise_host, + advertise_port): + + if not advertise_host: + advertise_host = _get_api_facing_ip_address(api_url) + + if not listen_host: + listen_host = advertise_host + + # TODO(russellhaering): Load this from the configuration API + mode_name = 'standby' + + mode_implementation = _load_mode_implementation(mode_name) + + return TeethAgent(api_url, + (listen_host, listen_port), + (advertise_host, advertise_port), + mode_implementation) diff --git a/teeth_agent/base.py b/teeth_agent/base.py index 7508207ee..f69fbe49c 100644 --- a/teeth_agent/base.py +++ b/teeth_agent/base.py @@ -16,39 +16,14 @@ limitations under the License. import abc import collections -import random -import socket import threading -import time -import urlparse import uuid -from cherrypy import wsgiserver -import pkg_resources -from stevedore import driver import structlog from teeth_rest import encoding from teeth_rest import errors as rest_errors -from teeth_agent import api from teeth_agent import errors -from teeth_agent import hardware -from teeth_agent import overlord_agent_api - - -class TeethAgentStatus(encoding.Serializable): - def __init__(self, mode, started_at, version): - self.mode = mode - self.started_at = started_at - self.version = version - - def serialize(self, view): - """Turn the status into a dict.""" - return collections.OrderedDict([ - ('mode', self.mode), - ('started_at', self.started_at), - ('version', self.version), - ]) class AgentCommandStatus(object): @@ -143,207 +118,8 @@ class AsyncCommandResult(BaseCommandResult): pass -class TeethAgentHeartbeater(threading.Thread): - # If we could wait at most N seconds between heartbeats (or in case of an - # error) we will instead wait r x N seconds, where r is a random value - # between these multipliers. - min_jitter_multiplier = 0.3 - max_jitter_multiplier = 0.6 - - # Exponential backoff values used in case of an error. In reality we will - # only wait a portion of either of these delays based on the jitter - # multipliers. - initial_delay = 1.0 - max_delay = 300.0 - backoff_factor = 2.7 - - def __init__(self, agent): - super(TeethAgentHeartbeater, self).__init__() - self.agent = agent - self.api = overlord_agent_api.APIClient(agent.api_url) - self.log = structlog.get_logger(api_url=agent.api_url) - self.stop_event = threading.Event() - self.error_delay = self.initial_delay - - def run(self): - # The first heartbeat happens now - self.log.info('starting heartbeater') - interval = 0 - - while not self.stop_event.wait(interval): - next_heartbeat_by = self.do_heartbeat() - interval_multiplier = random.uniform(self.min_jitter_multiplier, - self.max_jitter_multiplier) - interval = (next_heartbeat_by - time.time()) * interval_multiplier - self.log.info('sleeping before next heartbeat', interval=interval) - - def do_heartbeat(self): - try: - deadline = self.api.heartbeat( - mac_addr=self.agent.get_agent_mac_addr(), - url=self.agent.get_agent_url(), - version=self.agent.version, - mode=self.agent.mode_implementation.name) - self.error_delay = self.initial_delay - self.log.info('heartbeat successful') - except Exception as e: - self.log.error('error sending heartbeat', exception=e) - deadline = time.time() + self.error_delay - self.error_delay = min(self.error_delay * self.backoff_factor, - self.max_delay) - pass - - return deadline - - def stop(self): - self.log.info('stopping heartbeater') - self.stop_event.set() - return self.join() - - class BaseAgentMode(dict): def __init__(self, name): super(BaseAgentMode, self).__init__() self.log = structlog.get_logger(agent_mode=name) self.name = name - - -class TeethAgent(object): - def __init__(self, - listen_host, - listen_port, - advertise_host, - advertise_port, - api_url): - self.listen_host = listen_host - self.listen_port = listen_port - self.advertise_host = advertise_host - self.advertise_port = advertise_port - self.api_url = api_url - self.started_at = None - self.mode_implementation = None - self.version = pkg_resources.get_distribution('teeth-agent').version - self.api = api.TeethAgentAPIServer(self) - self.command_results = collections.OrderedDict() - self.heartbeater = TeethAgentHeartbeater(self) - self.hardware = hardware.HardwareInspector() - self.command_lock = threading.Lock() - self.log = structlog.get_logger() - - def get_status(self): - """Retrieve a serializable status.""" - return TeethAgentStatus( - mode=self.mode_implementation.name, - started_at=self.started_at, - version=self.version - ) - - def get_agent_url(self): - # If we put this behind any sort of proxy (ie, stunnel) we're going to - # need to (re)think this. - return 'http://{host}:{port}/'.format(host=self.advertise_host, - port=self.advertise_port) - - def get_api_facing_ip_address(self): - """Note: this will raise an exception if anything goes wrong. That is - expected to be fine, if we can't get to the agent API there isn't much - point in starting up. Just crash and rely on the process manager to - restart us in a sane fashion. - """ - api_addr = urlparse.urlparse(self.api_url) - - if api_addr.scheme not in ('http', 'https'): - raise RuntimeError('API URL scheme must be one of \'http\' or ' - '\'https\'.') - - api_port = api_addr.port or {'http': 80, 'https': 443}[api_addr.scheme] - api_host = api_addr.hostname - - self.log.info('attempting to resolve listen IP', - api_host=api_host, - api_port=api_port) - - conn = socket.create_connection((api_host, api_port)) - listen_ip = conn.getsockname()[0] - conn.close() - self.log.info('resolved listen IP', listen_ip=listen_ip) - - return listen_ip - - def load_mode_implementation(self, mode_name): - mgr = driver.DriverManager( - namespace='teeth_agent.modes', - name=mode_name, - invoke_on_load=True, - invoke_args=[], - ) - return mgr.driver - - def get_agent_mac_addr(self): - return self.hardware.get_primary_mac_address() - - def list_command_results(self): - return self.command_results.values() - - def get_command_result(self, result_id): - try: - return self.command_results[result_id] - except KeyError: - raise errors.RequestedObjectNotFoundError('Command Result', - result_id) - - def execute_command(self, command_name, **kwargs): - """Execute an agent command.""" - with self.command_lock: - if len(self.command_results) > 0: - last_command = self.command_results.values()[-1] - if not last_command.is_done(): - raise errors.CommandExecutionError('agent is busy') - - if command_name not in self.mode_implementation: - raise errors.InvalidCommandError(command_name) - - try: - command_fn = self.mode_implementation[command_name] - result = command_fn(command_name, **kwargs) - if not isinstance(result, BaseCommandResult): - result = SyncCommandResult(command_name, - kwargs, - True, - result) - except rest_errors.InvalidContentError as e: - # Any command may raise a InvalidContentError which will be - # returned to the caller directly. - raise e - except Exception as e: - # Other errors are considered command execution errors, and are - # recorded as an - result = SyncCommandResult(command_name, kwargs, False, e) - - self.command_results[result.id] = result - return result - - def run(self): - """Run the Teeth Agent.""" - self.started_at = time.time() - - if not self.advertise_host: - self.advertise_host = self.get_api_facing_ip_address() - - if not self.listen_host: - self.listen_host = self.advertise_host - - self.mode_implementation = self.load_mode_implementation('standby') - - self.heartbeater.start() - - listen_address = (self.listen_host, self.listen_port) - server = wsgiserver.CherryPyWSGIServer(listen_address, self.api) - - try: - server.start() - except BaseException as e: - self.log.error('shutting down', exception=e) - server.stop() - - self.heartbeater.stop()