split out base and agent modules

This commit is contained in:
Russell Haering 2014-01-15 15:22:12 -08:00
parent c62cf1a7fa
commit ad70d89847
2 changed files with 251 additions and 224 deletions
teeth_agent

251
teeth_agent/agent.py Normal file

@ -0,0 +1,251 @@
"""
Copyright 2013 Rackspace, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import collections
import random
import socket
import threading
import time
import urlparse
from cherrypy import wsgiserver
import pkg_resources
from stevedore import driver
import structlog
from teeth_rest import encoding
from teeth_rest import errors as rest_errors
from teeth_agent import api
from teeth_agent import base
from teeth_agent import errors
from teeth_agent import hardware
from teeth_agent import overlord_agent_api
class TeethAgentStatus(encoding.Serializable):
def __init__(self, mode, started_at, version):
self.mode = mode
self.started_at = started_at
self.version = version
def serialize(self, view):
"""Turn the status into a dict."""
return collections.OrderedDict([
('mode', self.mode),
('started_at', self.started_at),
('version', self.version),
])
class TeethAgentHeartbeater(threading.Thread):
# If we could wait at most N seconds between heartbeats (or in case of an
# error) we will instead wait r x N seconds, where r is a random value
# between these multipliers.
min_jitter_multiplier = 0.3
max_jitter_multiplier = 0.6
# Exponential backoff values used in case of an error. In reality we will
# only wait a portion of either of these delays based on the jitter
# multipliers.
initial_delay = 1.0
max_delay = 300.0
backoff_factor = 2.7
def __init__(self, agent):
super(TeethAgentHeartbeater, self).__init__()
self.agent = agent
self.api = overlord_agent_api.APIClient(agent.api_url)
self.log = structlog.get_logger(api_url=agent.api_url)
self.stop_event = threading.Event()
self.error_delay = self.initial_delay
def run(self):
# The first heartbeat happens now
self.log.info('starting heartbeater')
interval = 0
while not self.stop_event.wait(interval):
next_heartbeat_by = self.do_heartbeat()
interval_multiplier = random.uniform(self.min_jitter_multiplier,
self.max_jitter_multiplier)
interval = (next_heartbeat_by - time.time()) * interval_multiplier
self.log.info('sleeping before next heartbeat', interval=interval)
def do_heartbeat(self):
try:
deadline = self.api.heartbeat(
mac_addr=self.agent.get_agent_mac_addr(),
url=self.agent.get_agent_url(),
version=self.agent.version,
mode=self.agent.mode_implementation.name)
self.error_delay = self.initial_delay
self.log.info('heartbeat successful')
except Exception as e:
self.log.error('error sending heartbeat', exception=e)
deadline = time.time() + self.error_delay
self.error_delay = min(self.error_delay * self.backoff_factor,
self.max_delay)
pass
return deadline
def stop(self):
self.log.info('stopping heartbeater')
self.stop_event.set()
return self.join()
class TeethAgent(object):
def __init__(self, api_url, listen_address, advertise_address, mode_impl):
self.api_url = api_url
self.listen_address = listen_address
self.advertise_address = advertise_address
self.mode_implementation = mode_impl
self.version = pkg_resources.get_distribution('teeth-agent').version
self.api = api.TeethAgentAPIServer(self)
self.command_results = collections.OrderedDict()
self.heartbeater = TeethAgentHeartbeater(self)
self.hardware = hardware.HardwareInspector()
self.command_lock = threading.Lock()
self.log = structlog.get_logger()
self.started_at = None
def get_status(self):
"""Retrieve a serializable status."""
return TeethAgentStatus(
mode=self.mode_implementation.name,
started_at=self.started_at,
version=self.version
)
def get_agent_url(self):
# If we put this behind any sort of proxy (ie, stunnel) we're going to
# need to (re)think this.
return 'http://{host}:{port}/'.format(host=self.advertise_address[0],
port=self.advertise_address[1])
def get_agent_mac_addr(self):
return self.hardware.get_primary_mac_address()
def list_command_results(self):
return self.command_results.values()
def get_command_result(self, result_id):
try:
return self.command_results[result_id]
except KeyError:
raise errors.RequestedObjectNotFoundError('Command Result',
result_id)
def execute_command(self, command_name, **kwargs):
"""Execute an agent command."""
with self.command_lock:
if len(self.command_results) > 0:
last_command = self.command_results.values()[-1]
if not last_command.is_done():
raise errors.CommandExecutionError('agent is busy')
if command_name not in self.mode_implementation:
raise errors.InvalidCommandError(command_name)
try:
command_fn = self.mode_implementation[command_name]
result = command_fn(command_name, **kwargs)
if not isinstance(result, base.BaseCommandResult):
result = base.SyncCommandResult(command_name,
kwargs,
True,
result)
except rest_errors.InvalidContentError as e:
# Any command may raise a InvalidContentError which will be
# returned to the caller directly.
raise e
except Exception as e:
# Other errors are considered command execution errors, and are
# recorded as an
result = base.SyncCommandResult(command_name, kwargs, False, e)
self.command_results[result.id] = result
return result
def run(self):
"""Run the Teeth Agent."""
self.started_at = time.time()
self.heartbeater.start()
server = wsgiserver.CherryPyWSGIServer(self.listen_address, self.api)
try:
server.start()
except BaseException as e:
self.log.error('shutting down', exception=e)
server.stop()
self.heartbeater.stop()
def _get_api_facing_ip_address(api_url):
"""Note: this will raise an exception if anything goes wrong. That is
expected to be fine, if we can't get to the agent API there isn't much
point in starting up. Just crash and rely on the process manager to
restart us in a sane fashion.
"""
api_addr = urlparse.urlparse(api_url)
if api_addr.scheme not in ('http', 'https'):
raise RuntimeError('API URL scheme must be one of \'http\' or '
'\'https\'.')
api_port = api_addr.port or {'http': 80, 'https': 443}[api_addr.scheme]
api_host = api_addr.hostname
conn = socket.create_connection((api_host, api_port))
listen_ip = conn.getsockname()[0]
conn.close()
return listen_ip
def _load_mode_implementation(mode_name):
mgr = driver.DriverManager(
namespace='teeth_agent.modes',
name=mode_name,
invoke_on_load=True,
invoke_args=[],
)
return mgr.driver
def build_agent(api_url,
listen_host,
listen_port,
advertise_host,
advertise_port):
if not advertise_host:
advertise_host = _get_api_facing_ip_address(api_url)
if not listen_host:
listen_host = advertise_host
# TODO(russellhaering): Load this from the configuration API
mode_name = 'standby'
mode_implementation = _load_mode_implementation(mode_name)
return TeethAgent(api_url,
(listen_host, listen_port),
(advertise_host, advertise_port),
mode_implementation)

@ -16,39 +16,14 @@ limitations under the License.
import abc
import collections
import random
import socket
import threading
import time
import urlparse
import uuid
from cherrypy import wsgiserver
import pkg_resources
from stevedore import driver
import structlog
from teeth_rest import encoding
from teeth_rest import errors as rest_errors
from teeth_agent import api
from teeth_agent import errors
from teeth_agent import hardware
from teeth_agent import overlord_agent_api
class TeethAgentStatus(encoding.Serializable):
def __init__(self, mode, started_at, version):
self.mode = mode
self.started_at = started_at
self.version = version
def serialize(self, view):
"""Turn the status into a dict."""
return collections.OrderedDict([
('mode', self.mode),
('started_at', self.started_at),
('version', self.version),
])
class AgentCommandStatus(object):
@ -143,207 +118,8 @@ class AsyncCommandResult(BaseCommandResult):
pass
class TeethAgentHeartbeater(threading.Thread):
# If we could wait at most N seconds between heartbeats (or in case of an
# error) we will instead wait r x N seconds, where r is a random value
# between these multipliers.
min_jitter_multiplier = 0.3
max_jitter_multiplier = 0.6
# Exponential backoff values used in case of an error. In reality we will
# only wait a portion of either of these delays based on the jitter
# multipliers.
initial_delay = 1.0
max_delay = 300.0
backoff_factor = 2.7
def __init__(self, agent):
super(TeethAgentHeartbeater, self).__init__()
self.agent = agent
self.api = overlord_agent_api.APIClient(agent.api_url)
self.log = structlog.get_logger(api_url=agent.api_url)
self.stop_event = threading.Event()
self.error_delay = self.initial_delay
def run(self):
# The first heartbeat happens now
self.log.info('starting heartbeater')
interval = 0
while not self.stop_event.wait(interval):
next_heartbeat_by = self.do_heartbeat()
interval_multiplier = random.uniform(self.min_jitter_multiplier,
self.max_jitter_multiplier)
interval = (next_heartbeat_by - time.time()) * interval_multiplier
self.log.info('sleeping before next heartbeat', interval=interval)
def do_heartbeat(self):
try:
deadline = self.api.heartbeat(
mac_addr=self.agent.get_agent_mac_addr(),
url=self.agent.get_agent_url(),
version=self.agent.version,
mode=self.agent.mode_implementation.name)
self.error_delay = self.initial_delay
self.log.info('heartbeat successful')
except Exception as e:
self.log.error('error sending heartbeat', exception=e)
deadline = time.time() + self.error_delay
self.error_delay = min(self.error_delay * self.backoff_factor,
self.max_delay)
pass
return deadline
def stop(self):
self.log.info('stopping heartbeater')
self.stop_event.set()
return self.join()
class BaseAgentMode(dict):
def __init__(self, name):
super(BaseAgentMode, self).__init__()
self.log = structlog.get_logger(agent_mode=name)
self.name = name
class TeethAgent(object):
def __init__(self,
listen_host,
listen_port,
advertise_host,
advertise_port,
api_url):
self.listen_host = listen_host
self.listen_port = listen_port
self.advertise_host = advertise_host
self.advertise_port = advertise_port
self.api_url = api_url
self.started_at = None
self.mode_implementation = None
self.version = pkg_resources.get_distribution('teeth-agent').version
self.api = api.TeethAgentAPIServer(self)
self.command_results = collections.OrderedDict()
self.heartbeater = TeethAgentHeartbeater(self)
self.hardware = hardware.HardwareInspector()
self.command_lock = threading.Lock()
self.log = structlog.get_logger()
def get_status(self):
"""Retrieve a serializable status."""
return TeethAgentStatus(
mode=self.mode_implementation.name,
started_at=self.started_at,
version=self.version
)
def get_agent_url(self):
# If we put this behind any sort of proxy (ie, stunnel) we're going to
# need to (re)think this.
return 'http://{host}:{port}/'.format(host=self.advertise_host,
port=self.advertise_port)
def get_api_facing_ip_address(self):
"""Note: this will raise an exception if anything goes wrong. That is
expected to be fine, if we can't get to the agent API there isn't much
point in starting up. Just crash and rely on the process manager to
restart us in a sane fashion.
"""
api_addr = urlparse.urlparse(self.api_url)
if api_addr.scheme not in ('http', 'https'):
raise RuntimeError('API URL scheme must be one of \'http\' or '
'\'https\'.')
api_port = api_addr.port or {'http': 80, 'https': 443}[api_addr.scheme]
api_host = api_addr.hostname
self.log.info('attempting to resolve listen IP',
api_host=api_host,
api_port=api_port)
conn = socket.create_connection((api_host, api_port))
listen_ip = conn.getsockname()[0]
conn.close()
self.log.info('resolved listen IP', listen_ip=listen_ip)
return listen_ip
def load_mode_implementation(self, mode_name):
mgr = driver.DriverManager(
namespace='teeth_agent.modes',
name=mode_name,
invoke_on_load=True,
invoke_args=[],
)
return mgr.driver
def get_agent_mac_addr(self):
return self.hardware.get_primary_mac_address()
def list_command_results(self):
return self.command_results.values()
def get_command_result(self, result_id):
try:
return self.command_results[result_id]
except KeyError:
raise errors.RequestedObjectNotFoundError('Command Result',
result_id)
def execute_command(self, command_name, **kwargs):
"""Execute an agent command."""
with self.command_lock:
if len(self.command_results) > 0:
last_command = self.command_results.values()[-1]
if not last_command.is_done():
raise errors.CommandExecutionError('agent is busy')
if command_name not in self.mode_implementation:
raise errors.InvalidCommandError(command_name)
try:
command_fn = self.mode_implementation[command_name]
result = command_fn(command_name, **kwargs)
if not isinstance(result, BaseCommandResult):
result = SyncCommandResult(command_name,
kwargs,
True,
result)
except rest_errors.InvalidContentError as e:
# Any command may raise a InvalidContentError which will be
# returned to the caller directly.
raise e
except Exception as e:
# Other errors are considered command execution errors, and are
# recorded as an
result = SyncCommandResult(command_name, kwargs, False, e)
self.command_results[result.id] = result
return result
def run(self):
"""Run the Teeth Agent."""
self.started_at = time.time()
if not self.advertise_host:
self.advertise_host = self.get_api_facing_ip_address()
if not self.listen_host:
self.listen_host = self.advertise_host
self.mode_implementation = self.load_mode_implementation('standby')
self.heartbeater.start()
listen_address = (self.listen_host, self.listen_port)
server = wsgiserver.CherryPyWSGIServer(listen_address, self.api)
try:
server.start()
except BaseException as e:
self.log.error('shutting down', exception=e)
server.stop()
self.heartbeater.stop()