272 lines
9.3 KiB
Python
Raw Normal View History

2013-12-17 15:13:30 -08:00
"""
Copyright 2013 Rackspace, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import abc
import collections
2014-01-07 14:28:05 -08:00
import random
import threading
2013-12-17 15:13:30 -08:00
import time
2014-01-02 13:17:15 -08:00
import uuid
2013-12-17 15:13:30 -08:00
2013-12-18 15:47:48 -08:00
import pkg_resources
from teeth_rest import encoding
2014-01-02 13:17:15 -08:00
from teeth_rest import errors as rest_errors
2013-12-18 15:47:48 -08:00
from werkzeug import serving
2013-12-17 15:13:30 -08:00
2013-12-18 15:47:48 -08:00
from teeth_agent import api
2013-12-21 17:22:09 -08:00
from teeth_agent import errors
2014-01-07 15:58:25 -08:00
from teeth_agent import hardware
2014-01-07 14:28:05 -08:00
from teeth_agent import overlord_agent_api
2013-12-17 15:13:30 -08:00
2013-12-18 15:47:48 -08:00
class TeethAgentStatus(encoding.Serializable):
2013-12-17 15:13:30 -08:00
def __init__(self, mode, started_at, version):
self.mode = mode
self.started_at = started_at
self.version = version
def serialize(self, view):
"""Turn the status into a dict."""
return collections.OrderedDict([
2013-12-17 15:13:30 -08:00
('mode', self.mode),
('started_at', self.started_at),
('version', self.version),
])
2014-01-02 13:17:15 -08:00
class AgentCommandStatus(object):
RUNNING = 'RUNNING'
SUCCEEDED = 'SUCCEEDED'
FAILED = 'FAILED'
class BaseCommandResult(encoding.Serializable):
def __init__(self, command_name, command_params):
self.id = str(uuid.uuid4())
self.command_name = command_name
self.command_params = command_params
self.command_status = AgentCommandStatus.RUNNING
self.command_error = None
self.command_result = None
def serialize(self, view):
return collections.OrderedDict([
('id', self.id),
('command_name', self.command_name),
('command_params', self.command_params),
('command_status', self.command_status),
('command_error', self.command_error),
('command_result', self.command_result),
])
def is_done(self):
return self.command_status != AgentCommandStatus.RUNNING
2014-01-02 13:17:15 -08:00
class SyncCommandResult(BaseCommandResult):
def __init__(self, command_name, command_params, success, result_or_error):
super(SyncCommandResult, self).__init__(command_name,
command_params)
if success:
self.command_status = AgentCommandStatus.SUCCEEDED
self.command_result = result_or_error
else:
self.command_status = AgentCommandStatus.FAILED
self.command_error = result_or_error
class AsyncCommandResult(BaseCommandResult):
"""A command that executes asynchronously in the background. Subclasses
should override `execute` to implement actual command execution.
"""
def __init__(self, command_name, command_params):
super(AsyncCommandResult, self).__init__(command_name, command_params)
self.command_state_lock = threading.Lock()
thread_name = 'agent-command-{}'.format(self.id)
self.execution_thread = threading.Thread(target=self.run,
name=thread_name)
def serialize(self, view):
with self.command_state_lock:
return super(AsyncCommandResult, self).serialize(view)
def start(self):
self.execution_thread.start()
return self
def join(self):
self.execution_thread.join()
return self
def is_done(self):
with self.command_state_lock:
return super(AsyncCommandResult, self).is_done()
def run(self):
try:
result = self.execute()
with self.command_state_lock:
self.command_result = result
self.command_status = AgentCommandStatus.SUCCEEDED
except Exception as e:
if not isinstance(e, rest_errors.RESTError):
e = errors.CommandExecutionError(str(e))
with self.command_state_lock:
self.command_error = e
self.command_status = AgentCommandStatus.FAILED
@abc.abstractmethod
def execute(self):
pass
2014-01-07 14:28:05 -08:00
class TeethAgentHeartbeater(threading.Thread):
2014-01-07 23:45:47 -08:00
# If we could wait at most N seconds between heartbeats (or in case of an
# error) we will instead wait r x N seconds, where r is a random value
# between these multipliers.
2014-01-07 14:28:05 -08:00
min_jitter_multiplier = 0.3
max_jitter_multiplier = 0.6
2014-01-07 23:45:47 -08:00
# Exponential backoff values used in case of an error. In reality we will
# only wait a portion of either of these delays based on the jitter
# multipliers.
initial_delay = 1.0
max_delay = 300.0
backoff_factor = 2.7
2014-01-07 14:28:05 -08:00
def __init__(self, agent):
super(TeethAgentHeartbeater, self).__init__()
2014-01-07 14:28:05 -08:00
self.agent = agent
self.api = overlord_agent_api.APIClient(agent.api_url)
self.stop_event = threading.Event()
2014-01-07 23:45:47 -08:00
self.error_delay = self.initial_delay
2014-01-07 14:28:05 -08:00
def run(self):
# The first heartbeat happens now
interval = 0
while not self.stop_event.wait(interval):
next_heartbeat_by = self.do_heartbeat()
interval_multiplier = random.uniform(self.min_jitter_multiplier,
self.max_jitter_multiplier)
interval = (next_heartbeat_by - time.time()) * interval_multiplier
def do_heartbeat(self):
try:
2014-01-07 17:34:15 -08:00
deadline = self.api.heartbeat(
2014-01-07 14:28:05 -08:00
mac_addr=self.agent.get_agent_mac_addr(),
2014-01-08 14:33:33 -08:00
url=self.agent.get_agent_url(),
version=self.agent.version,
mode=self.agent.mode)
2014-01-07 23:45:47 -08:00
self.error_delay = self.initial_delay
2014-01-07 14:28:05 -08:00
except Exception:
2014-01-07 23:45:47 -08:00
deadline = time.time() + self.error_delay
self.error_delay = min(self.error_delay * self.backoff_factor,
self.max_delay)
2014-01-07 14:28:05 -08:00
pass
return deadline
def stop(self):
self.stop_event.set()
return self.join()
2014-01-07 14:28:05 -08:00
class BaseTeethAgent(object):
def __init__(self, listen_host, listen_port, api_url, mode):
2013-12-17 15:13:30 -08:00
self.listen_host = listen_host
self.listen_port = listen_port
self.api_url = api_url
2013-12-17 15:13:30 -08:00
self.started_at = None
self.mode = mode
2014-01-07 14:16:13 -08:00
self.version = pkg_resources.get_distribution('teeth-agent').version
2013-12-18 15:47:48 -08:00
self.api = api.TeethAgentAPIServer(self)
self.command_results = collections.OrderedDict()
2013-12-21 17:22:09 -08:00
self.command_map = {}
2014-01-07 14:28:05 -08:00
self.heartbeater = TeethAgentHeartbeater(self)
2014-01-07 15:58:25 -08:00
self.hardware = hardware.HardwareInspector()
self.command_lock = threading.Lock()
2013-12-17 15:13:30 -08:00
def get_status(self):
"""Retrieve a serializable status."""
2013-12-17 15:13:30 -08:00
return TeethAgentStatus(
mode=self.mode,
started_at=self.started_at,
2014-01-07 14:16:13 -08:00
version=self.version
2013-12-17 15:13:30 -08:00
)
def get_agent_url(self):
2014-01-07 15:58:25 -08:00
# If we put this behind any sort of proxy (ie, stunnel) we're going to
# need to (re)think this.
2014-01-08 14:33:33 -08:00
return 'http://{host}:{port}/'.format(host=self.listen_host,
port=self.listen_port)
def get_agent_mac_addr(self):
2014-01-07 15:58:25 -08:00
return self.hardware.get_primary_mac_address()
def list_command_results(self):
return self.command_results.values()
def get_command_result(self, result_id):
try:
return self.command_results[result_id]
except KeyError:
raise errors.RequestedObjectNotFoundError('Command Result',
result_id)
def execute_command(self, command_name, **kwargs):
"""Execute an agent command."""
with self.command_lock:
if len(self.command_results) > 0:
last_command = self.command_results.values()[-1]
if not last_command.is_done():
raise errors.CommandExecutionError('agent is busy')
if command_name not in self.command_map:
raise errors.InvalidCommandError(command_name)
try:
result = self.command_map[command_name](command_name, **kwargs)
if not isinstance(result, BaseCommandResult):
result = SyncCommandResult(command_name,
kwargs,
True,
result)
except rest_errors.InvalidContentError as e:
# Any command may raise a InvalidContentError which will be
# returned to the caller directly.
raise e
except Exception as e:
# Other errors are considered command execution errors, and are
# recorded as an
result = SyncCommandResult(command_name, kwargs, False, e)
self.command_results[result.id] = result
return result
2013-12-27 15:06:29 -08:00
2013-12-17 15:13:30 -08:00
def run(self):
"""Run the Teeth Agent."""
2013-12-17 15:13:30 -08:00
if self.started_at:
raise RuntimeError('Agent was already started')
self.started_at = time.time()
2014-01-07 18:15:01 -08:00
self.heartbeater.start()
2013-12-18 15:47:48 -08:00
serving.run_simple(self.listen_host, self.listen_port, self.api)
self.heartbeater.stop()