2013-12-17 15:13:30 -08:00
|
|
|
"""
|
|
|
|
Copyright 2013 Rackspace, Inc.
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
"""
|
|
|
|
|
2014-01-05 21:46:22 -08:00
|
|
|
import abc
|
2013-12-19 16:59:16 -08:00
|
|
|
import collections
|
2014-01-07 14:28:05 -08:00
|
|
|
import random
|
2014-01-13 16:29:45 -08:00
|
|
|
import socket
|
2014-01-05 21:46:22 -08:00
|
|
|
import threading
|
2013-12-17 15:13:30 -08:00
|
|
|
import time
|
2014-01-13 16:29:45 -08:00
|
|
|
import urlparse
|
2014-01-02 13:17:15 -08:00
|
|
|
import uuid
|
2013-12-17 15:13:30 -08:00
|
|
|
|
2014-01-10 13:49:48 -08:00
|
|
|
from cherrypy import wsgiserver
|
2013-12-18 15:47:48 -08:00
|
|
|
import pkg_resources
|
2014-01-11 21:19:12 -08:00
|
|
|
import structlog
|
2013-12-18 15:47:48 -08:00
|
|
|
from teeth_rest import encoding
|
2014-01-02 13:17:15 -08:00
|
|
|
from teeth_rest import errors as rest_errors
|
2013-12-17 15:13:30 -08:00
|
|
|
|
2013-12-18 15:47:48 -08:00
|
|
|
from teeth_agent import api
|
2013-12-21 17:22:09 -08:00
|
|
|
from teeth_agent import errors
|
2014-01-07 15:58:25 -08:00
|
|
|
from teeth_agent import hardware
|
2014-01-07 14:28:05 -08:00
|
|
|
from teeth_agent import overlord_agent_api
|
2013-12-17 15:13:30 -08:00
|
|
|
|
|
|
|
|
2013-12-18 15:47:48 -08:00
|
|
|
class TeethAgentStatus(encoding.Serializable):
|
2013-12-17 15:13:30 -08:00
|
|
|
def __init__(self, mode, started_at, version):
|
|
|
|
self.mode = mode
|
|
|
|
self.started_at = started_at
|
|
|
|
self.version = version
|
|
|
|
|
|
|
|
def serialize(self, view):
|
2013-12-19 16:59:16 -08:00
|
|
|
"""Turn the status into a dict."""
|
|
|
|
return collections.OrderedDict([
|
2013-12-17 15:13:30 -08:00
|
|
|
('mode', self.mode),
|
|
|
|
('started_at', self.started_at),
|
|
|
|
('version', self.version),
|
|
|
|
])
|
|
|
|
|
|
|
|
|
2014-01-02 13:17:15 -08:00
|
|
|
class AgentCommandStatus(object):
|
|
|
|
RUNNING = 'RUNNING'
|
|
|
|
SUCCEEDED = 'SUCCEEDED'
|
|
|
|
FAILED = 'FAILED'
|
|
|
|
|
|
|
|
|
|
|
|
class BaseCommandResult(encoding.Serializable):
|
|
|
|
def __init__(self, command_name, command_params):
|
|
|
|
self.id = str(uuid.uuid4())
|
|
|
|
self.command_name = command_name
|
|
|
|
self.command_params = command_params
|
|
|
|
self.command_status = AgentCommandStatus.RUNNING
|
|
|
|
self.command_error = None
|
|
|
|
self.command_result = None
|
|
|
|
|
|
|
|
def serialize(self, view):
|
|
|
|
return collections.OrderedDict([
|
|
|
|
('id', self.id),
|
|
|
|
('command_name', self.command_name),
|
|
|
|
('command_params', self.command_params),
|
|
|
|
('command_status', self.command_status),
|
|
|
|
('command_error', self.command_error),
|
|
|
|
('command_result', self.command_result),
|
|
|
|
])
|
|
|
|
|
2014-01-09 14:44:15 -08:00
|
|
|
def is_done(self):
|
|
|
|
return self.command_status != AgentCommandStatus.RUNNING
|
|
|
|
|
2014-01-09 15:35:35 -08:00
|
|
|
def join(self):
|
|
|
|
return self
|
|
|
|
|
2014-01-02 13:17:15 -08:00
|
|
|
|
|
|
|
class SyncCommandResult(BaseCommandResult):
|
|
|
|
def __init__(self, command_name, command_params, success, result_or_error):
|
|
|
|
super(SyncCommandResult, self).__init__(command_name,
|
|
|
|
command_params)
|
|
|
|
if success:
|
|
|
|
self.command_status = AgentCommandStatus.SUCCEEDED
|
|
|
|
self.command_result = result_or_error
|
|
|
|
else:
|
|
|
|
self.command_status = AgentCommandStatus.FAILED
|
|
|
|
self.command_error = result_or_error
|
|
|
|
|
|
|
|
|
2014-01-05 21:46:22 -08:00
|
|
|
class AsyncCommandResult(BaseCommandResult):
|
|
|
|
"""A command that executes asynchronously in the background. Subclasses
|
|
|
|
should override `execute` to implement actual command execution.
|
|
|
|
"""
|
|
|
|
def __init__(self, command_name, command_params):
|
|
|
|
super(AsyncCommandResult, self).__init__(command_name, command_params)
|
|
|
|
self.command_state_lock = threading.Lock()
|
|
|
|
|
|
|
|
thread_name = 'agent-command-{}'.format(self.id)
|
|
|
|
self.execution_thread = threading.Thread(target=self.run,
|
|
|
|
name=thread_name)
|
|
|
|
|
|
|
|
def serialize(self, view):
|
|
|
|
with self.command_state_lock:
|
|
|
|
return super(AsyncCommandResult, self).serialize(view)
|
|
|
|
|
|
|
|
def start(self):
|
2014-01-09 13:44:08 -08:00
|
|
|
self.execution_thread.start()
|
|
|
|
return self
|
2014-01-05 21:46:22 -08:00
|
|
|
|
|
|
|
def join(self):
|
2014-01-09 13:44:08 -08:00
|
|
|
self.execution_thread.join()
|
|
|
|
return self
|
2014-01-05 21:46:22 -08:00
|
|
|
|
2014-01-09 14:44:15 -08:00
|
|
|
def is_done(self):
|
|
|
|
with self.command_state_lock:
|
|
|
|
return super(AsyncCommandResult, self).is_done()
|
|
|
|
|
2014-01-05 21:46:22 -08:00
|
|
|
def run(self):
|
|
|
|
try:
|
|
|
|
result = self.execute()
|
|
|
|
with self.command_state_lock:
|
|
|
|
self.command_result = result
|
|
|
|
self.command_status = AgentCommandStatus.SUCCEEDED
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
if not isinstance(e, rest_errors.RESTError):
|
|
|
|
e = errors.CommandExecutionError(str(e))
|
|
|
|
|
|
|
|
with self.command_state_lock:
|
|
|
|
self.command_error = e
|
|
|
|
self.command_status = AgentCommandStatus.FAILED
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
def execute(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2014-01-07 14:28:05 -08:00
|
|
|
class TeethAgentHeartbeater(threading.Thread):
|
2014-01-07 23:45:47 -08:00
|
|
|
# If we could wait at most N seconds between heartbeats (or in case of an
|
|
|
|
# error) we will instead wait r x N seconds, where r is a random value
|
|
|
|
# between these multipliers.
|
2014-01-07 14:28:05 -08:00
|
|
|
min_jitter_multiplier = 0.3
|
|
|
|
max_jitter_multiplier = 0.6
|
|
|
|
|
2014-01-07 23:45:47 -08:00
|
|
|
# Exponential backoff values used in case of an error. In reality we will
|
|
|
|
# only wait a portion of either of these delays based on the jitter
|
|
|
|
# multipliers.
|
|
|
|
initial_delay = 1.0
|
|
|
|
max_delay = 300.0
|
|
|
|
backoff_factor = 2.7
|
|
|
|
|
2014-01-07 14:28:05 -08:00
|
|
|
def __init__(self, agent):
|
2014-01-08 08:42:03 -08:00
|
|
|
super(TeethAgentHeartbeater, self).__init__()
|
2014-01-07 14:28:05 -08:00
|
|
|
self.agent = agent
|
|
|
|
self.api = overlord_agent_api.APIClient(agent.api_url)
|
2014-01-11 21:19:12 -08:00
|
|
|
self.log = structlog.get_logger(api_url=agent.api_url)
|
2014-01-07 14:28:05 -08:00
|
|
|
self.stop_event = threading.Event()
|
2014-01-07 23:45:47 -08:00
|
|
|
self.error_delay = self.initial_delay
|
2014-01-07 14:28:05 -08:00
|
|
|
|
|
|
|
def run(self):
|
|
|
|
# The first heartbeat happens now
|
2014-01-11 21:19:12 -08:00
|
|
|
self.log.info('starting heartbeater')
|
2014-01-07 14:28:05 -08:00
|
|
|
interval = 0
|
|
|
|
|
|
|
|
while not self.stop_event.wait(interval):
|
|
|
|
next_heartbeat_by = self.do_heartbeat()
|
|
|
|
interval_multiplier = random.uniform(self.min_jitter_multiplier,
|
|
|
|
self.max_jitter_multiplier)
|
|
|
|
interval = (next_heartbeat_by - time.time()) * interval_multiplier
|
2014-01-13 13:00:09 -08:00
|
|
|
self.log.info('sleeping before next heartbeat', interval=interval)
|
2014-01-07 14:28:05 -08:00
|
|
|
|
|
|
|
def do_heartbeat(self):
|
|
|
|
try:
|
2014-01-07 17:34:15 -08:00
|
|
|
deadline = self.api.heartbeat(
|
2014-01-07 14:28:05 -08:00
|
|
|
mac_addr=self.agent.get_agent_mac_addr(),
|
2014-01-08 14:33:33 -08:00
|
|
|
url=self.agent.get_agent_url(),
|
|
|
|
version=self.agent.version,
|
|
|
|
mode=self.agent.mode)
|
2014-01-07 23:45:47 -08:00
|
|
|
self.error_delay = self.initial_delay
|
2014-01-11 21:19:12 -08:00
|
|
|
self.log.info('heartbeat successful')
|
|
|
|
except Exception as e:
|
|
|
|
self.log.error('error sending heartbeat', exception=e)
|
2014-01-07 23:45:47 -08:00
|
|
|
deadline = time.time() + self.error_delay
|
|
|
|
self.error_delay = min(self.error_delay * self.backoff_factor,
|
|
|
|
self.max_delay)
|
2014-01-07 14:28:05 -08:00
|
|
|
pass
|
|
|
|
|
|
|
|
return deadline
|
|
|
|
|
2014-01-08 08:42:03 -08:00
|
|
|
def stop(self):
|
2014-01-11 21:19:12 -08:00
|
|
|
self.log.info('stopping heartbeater')
|
2014-01-08 08:42:03 -08:00
|
|
|
self.stop_event.set()
|
|
|
|
return self.join()
|
|
|
|
|
2014-01-07 14:28:05 -08:00
|
|
|
|
2013-12-20 12:57:38 -08:00
|
|
|
class BaseTeethAgent(object):
|
2014-01-13 17:27:07 -08:00
|
|
|
def __init__(self,
|
|
|
|
listen_host,
|
|
|
|
listen_port,
|
2014-01-13 21:06:28 -08:00
|
|
|
advertise_host,
|
2014-01-13 17:27:07 -08:00
|
|
|
advertise_port,
|
|
|
|
api_url,
|
|
|
|
mode):
|
|
|
|
self.listen_host = listen_host
|
2013-12-17 15:13:30 -08:00
|
|
|
self.listen_port = listen_port
|
2014-01-13 21:06:28 -08:00
|
|
|
self.advertise_host = advertise_host
|
2014-01-13 16:46:27 -08:00
|
|
|
self.advertise_port = advertise_port
|
2014-01-06 12:02:12 -08:00
|
|
|
self.api_url = api_url
|
2013-12-17 15:13:30 -08:00
|
|
|
self.started_at = None
|
|
|
|
self.mode = mode
|
2014-01-07 14:16:13 -08:00
|
|
|
self.version = pkg_resources.get_distribution('teeth-agent').version
|
2013-12-18 15:47:48 -08:00
|
|
|
self.api = api.TeethAgentAPIServer(self)
|
2014-01-09 13:43:04 -08:00
|
|
|
self.command_results = collections.OrderedDict()
|
2013-12-21 17:22:09 -08:00
|
|
|
self.command_map = {}
|
2014-01-07 14:28:05 -08:00
|
|
|
self.heartbeater = TeethAgentHeartbeater(self)
|
2014-01-07 15:58:25 -08:00
|
|
|
self.hardware = hardware.HardwareInspector()
|
2014-01-09 14:46:52 -08:00
|
|
|
self.command_lock = threading.Lock()
|
2014-01-13 16:29:45 -08:00
|
|
|
self.log = structlog.get_logger()
|
2013-12-17 15:13:30 -08:00
|
|
|
|
|
|
|
def get_status(self):
|
2013-12-19 16:59:16 -08:00
|
|
|
"""Retrieve a serializable status."""
|
2013-12-17 15:13:30 -08:00
|
|
|
return TeethAgentStatus(
|
|
|
|
mode=self.mode,
|
|
|
|
started_at=self.started_at,
|
2014-01-07 14:16:13 -08:00
|
|
|
version=self.version
|
2013-12-17 15:13:30 -08:00
|
|
|
)
|
|
|
|
|
2014-01-07 14:27:55 -08:00
|
|
|
def get_agent_url(self):
|
2014-01-07 15:58:25 -08:00
|
|
|
# If we put this behind any sort of proxy (ie, stunnel) we're going to
|
|
|
|
# need to (re)think this.
|
2014-01-13 17:27:07 -08:00
|
|
|
return 'http://{host}:{port}/'.format(host=self.advertise_host,
|
2014-01-13 16:46:27 -08:00
|
|
|
port=self.advertise_port)
|
2014-01-07 14:27:55 -08:00
|
|
|
|
2014-01-13 16:29:45 -08:00
|
|
|
def get_api_facing_ip_address(self):
|
|
|
|
"""Note: this will raise an exception if anything goes wrong. That is
|
|
|
|
expected to be fine, if we can't get to the agent API there isn't much
|
|
|
|
point in starting up. Just crash and rely on the process manager to
|
|
|
|
restart us in a sane fashion.
|
|
|
|
"""
|
|
|
|
api_addr = urlparse.urlparse(self.api_url)
|
|
|
|
|
|
|
|
if api_addr.scheme not in ('http', 'https'):
|
|
|
|
raise RuntimeError('API URL scheme must be one of \'http\' or '
|
|
|
|
'\'https\'.')
|
|
|
|
|
|
|
|
api_port = api_addr.port or {'http': 80, 'https': 443}[api_addr.scheme]
|
|
|
|
api_host = api_addr.hostname
|
|
|
|
|
|
|
|
self.log.info('attempting to resolve listen IP',
|
|
|
|
api_host=api_host,
|
|
|
|
api_port=api_port)
|
|
|
|
|
|
|
|
conn = socket.create_connection((api_host, api_port))
|
|
|
|
listen_ip = conn.getsockname()[0]
|
|
|
|
conn.close()
|
|
|
|
self.log.info('resolved listen IP', listen_ip=listen_ip)
|
|
|
|
|
|
|
|
return listen_ip
|
|
|
|
|
2014-01-07 14:27:55 -08:00
|
|
|
def get_agent_mac_addr(self):
|
2014-01-07 15:58:25 -08:00
|
|
|
return self.hardware.get_primary_mac_address()
|
2014-01-07 14:27:55 -08:00
|
|
|
|
2014-01-09 13:43:04 -08:00
|
|
|
def list_command_results(self):
|
|
|
|
return self.command_results.values()
|
|
|
|
|
|
|
|
def get_command_result(self, result_id):
|
|
|
|
try:
|
|
|
|
return self.command_results[result_id]
|
|
|
|
except KeyError:
|
|
|
|
raise errors.RequestedObjectNotFoundError('Command Result',
|
|
|
|
result_id)
|
|
|
|
|
2013-12-20 16:03:30 -08:00
|
|
|
def execute_command(self, command_name, **kwargs):
|
2013-12-19 16:59:16 -08:00
|
|
|
"""Execute an agent command."""
|
2014-01-09 14:46:52 -08:00
|
|
|
with self.command_lock:
|
|
|
|
if len(self.command_results) > 0:
|
|
|
|
last_command = self.command_results.values()[-1]
|
|
|
|
if not last_command.is_done():
|
|
|
|
raise errors.CommandExecutionError('agent is busy')
|
|
|
|
|
|
|
|
if command_name not in self.command_map:
|
|
|
|
raise errors.InvalidCommandError(command_name)
|
|
|
|
|
|
|
|
try:
|
|
|
|
result = self.command_map[command_name](command_name, **kwargs)
|
|
|
|
if not isinstance(result, BaseCommandResult):
|
|
|
|
result = SyncCommandResult(command_name,
|
|
|
|
kwargs,
|
|
|
|
True,
|
|
|
|
result)
|
|
|
|
except rest_errors.InvalidContentError as e:
|
|
|
|
# Any command may raise a InvalidContentError which will be
|
|
|
|
# returned to the caller directly.
|
|
|
|
raise e
|
|
|
|
except Exception as e:
|
|
|
|
# Other errors are considered command execution errors, and are
|
|
|
|
# recorded as an
|
|
|
|
result = SyncCommandResult(command_name, kwargs, False, e)
|
|
|
|
|
|
|
|
self.command_results[result.id] = result
|
|
|
|
return result
|
2013-12-27 15:06:29 -08:00
|
|
|
|
2013-12-17 15:13:30 -08:00
|
|
|
def run(self):
|
2013-12-19 16:59:16 -08:00
|
|
|
"""Run the Teeth Agent."""
|
2013-12-17 15:13:30 -08:00
|
|
|
self.started_at = time.time()
|
2014-01-13 21:06:28 -08:00
|
|
|
|
|
|
|
if not self.advertise_host:
|
|
|
|
self.advertise_host = self.get_api_facing_ip_address()
|
2014-01-13 17:27:07 -08:00
|
|
|
|
|
|
|
if not self.listen_host:
|
|
|
|
self.listen_host = self.advertise_host
|
|
|
|
|
2014-01-07 18:15:01 -08:00
|
|
|
self.heartbeater.start()
|
2014-01-10 13:49:48 -08:00
|
|
|
|
|
|
|
listen_address = (self.listen_host, self.listen_port)
|
|
|
|
server = wsgiserver.CherryPyWSGIServer(listen_address, self.api)
|
|
|
|
|
|
|
|
try:
|
|
|
|
server.start()
|
2014-01-14 12:47:52 -08:00
|
|
|
except BaseException as e:
|
|
|
|
self.log.error('shutting down', exception=e)
|
2014-01-10 13:49:48 -08:00
|
|
|
server.stop()
|
|
|
|
|
2014-01-08 08:42:03 -08:00
|
|
|
self.heartbeater.stop()
|