236 lines
8.0 KiB
Python
Raw Normal View History

2014-01-15 15:22:12 -08:00
"""
Copyright 2013 Rackspace, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import collections
import random
import threading
import time
import pkg_resources
from stevedore import driver
2014-03-07 15:36:22 -08:00
from wsgiref import simple_server
2014-01-15 15:22:12 -08:00
2014-03-07 15:36:22 -08:00
from teeth_agent.api import app
2014-01-15 15:22:12 -08:00
from teeth_agent import base
from teeth_agent import encoding
2014-01-15 15:22:12 -08:00
from teeth_agent import errors
from teeth_agent import hardware
2014-03-17 15:17:27 -07:00
from teeth_agent.openstack.common import log
2014-01-15 15:22:12 -08:00
from teeth_agent import overlord_agent_api
2014-03-17 15:17:27 -07:00
def _time():
"""Wraps time.time() for simpler testing."""
return time.time()
2014-01-15 15:22:12 -08:00
class TeethAgentStatus(encoding.Serializable):
def __init__(self, mode, started_at, version):
self.mode = mode
self.started_at = started_at
self.version = version
def serialize(self):
2014-01-15 15:22:12 -08:00
"""Turn the status into a dict."""
return collections.OrderedDict([
('mode', self.mode),
('started_at', self.started_at),
('version', self.version),
])
class TeethAgentHeartbeater(threading.Thread):
# If we could wait at most N seconds between heartbeats (or in case of an
# error) we will instead wait r x N seconds, where r is a random value
# between these multipliers.
min_jitter_multiplier = 0.3
max_jitter_multiplier = 0.6
# Exponential backoff values used in case of an error. In reality we will
# only wait a portion of either of these delays based on the jitter
# multipliers.
initial_delay = 1.0
max_delay = 300.0
backoff_factor = 2.7
def __init__(self, agent):
super(TeethAgentHeartbeater, self).__init__()
self.agent = agent
self.hardware = hardware.get_manager()
2014-01-15 15:22:12 -08:00
self.api = overlord_agent_api.APIClient(agent.api_url)
2014-03-17 15:17:27 -07:00
self.log = log.getLogger(__name__)
2014-01-15 15:22:12 -08:00
self.stop_event = threading.Event()
self.error_delay = self.initial_delay
def run(self):
# The first heartbeat happens now
self.log.info('starting heartbeater')
interval = 0
while not self.stop_event.wait(interval):
next_heartbeat_by = self.do_heartbeat()
interval_multiplier = random.uniform(self.min_jitter_multiplier,
self.max_jitter_multiplier)
2014-03-17 15:17:27 -07:00
interval = (next_heartbeat_by - _time()) * interval_multiplier
log_msg = 'sleeping before next heartbeat, interval: {0}'
self.log.info(log_msg.format(interval))
2014-01-15 15:22:12 -08:00
def do_heartbeat(self):
try:
deadline = self.api.heartbeat(
hardware_info=self.hardware.list_hardware_info(),
2014-01-15 15:22:12 -08:00
version=self.agent.version,
mode=self.agent.get_mode_name())
2014-01-15 15:22:12 -08:00
self.error_delay = self.initial_delay
self.log.info('heartbeat successful')
except Exception as e:
2014-03-17 15:17:27 -07:00
self.log.error('error sending heartbeat')
self.log.exception(e)
deadline = _time() + self.error_delay
2014-01-15 15:22:12 -08:00
self.error_delay = min(self.error_delay * self.backoff_factor,
self.max_delay)
pass
return deadline
def stop(self):
self.log.info('stopping heartbeater')
self.stop_event.set()
return self.join()
class TeethAgent(object):
2014-03-14 16:19:20 -07:00
def __init__(self, api_url, listen_address, ipaddr):
2014-01-15 15:22:12 -08:00
self.api_url = api_url
self.listen_address = listen_address
2014-03-14 16:19:20 -07:00
self.ipaddr = ipaddr
self.mode_implementation = None
2014-01-15 15:22:12 -08:00
self.version = pkg_resources.get_distribution('teeth-agent').version
2014-03-11 14:13:13 -07:00
self.api = app.VersionSelectorApplication(self)
2014-01-15 15:22:12 -08:00
self.command_results = collections.OrderedDict()
self.heartbeater = TeethAgentHeartbeater(self)
2014-01-22 17:28:24 -08:00
self.hardware = hardware.get_manager()
2014-01-15 15:22:12 -08:00
self.command_lock = threading.Lock()
2014-03-17 15:17:27 -07:00
self.log = log.getLogger(__name__)
2014-01-15 15:22:12 -08:00
self.started_at = None
def get_mode_name(self):
if self.mode_implementation:
return self.mode_implementation.name
else:
return 'NONE'
2014-01-15 15:22:12 -08:00
def get_status(self):
"""Retrieve a serializable status."""
return TeethAgentStatus(
mode=self.get_mode_name(),
2014-01-15 15:22:12 -08:00
started_at=self.started_at,
version=self.version
)
def get_agent_mac_addr(self):
return self.hardware.get_primary_mac_address()
def list_command_results(self):
return self.command_results.values()
def get_command_result(self, result_id):
try:
return self.command_results[result_id]
except KeyError:
raise errors.RequestedObjectNotFoundError('Command Result',
result_id)
def _split_command(self, command_name):
command_parts = command_name.split('.', 1)
if len(command_parts) != 2:
raise errors.InvalidCommandError(
'Command name must be of the form <mode>.<name>')
return (command_parts[0], command_parts[1])
def _verify_mode(self, mode_name, command_name):
if not self.mode_implementation:
try:
self.mode_implementation = _load_mode_implementation(mode_name)
except Exception:
raise errors.InvalidCommandError(
'Unknown mode: {}'.format(mode_name))
elif self.get_mode_name().lower() != mode_name:
raise errors.InvalidCommandError(
'Agent is already in {} mode'.format(self.get_mode_name()))
2014-01-15 15:22:12 -08:00
def execute_command(self, command_name, **kwargs):
"""Execute an agent command."""
with self.command_lock:
mode_part, command_part = self._split_command(command_name)
self._verify_mode(mode_part, command_part)
2014-01-15 15:22:12 -08:00
if len(self.command_results) > 0:
last_command = self.command_results.values()[-1]
if not last_command.is_done():
raise errors.CommandExecutionError('agent is busy')
try:
result = self.mode_implementation.execute(command_part,
2014-01-21 10:42:43 -08:00
**kwargs)
except errors.InvalidContentError as e:
2014-01-15 15:22:12 -08:00
# Any command may raise a InvalidContentError which will be
# returned to the caller directly.
raise e
except Exception as e:
# Other errors are considered command execution errors, and are
# recorded as an
2014-03-11 14:13:13 -07:00
result = base.SyncCommandResult(command_name,
kwargs,
False,
unicode(e))
2014-01-15 15:22:12 -08:00
self.command_results[result.id] = result
return result
def run(self):
"""Run the Teeth Agent."""
2014-03-17 15:17:27 -07:00
self.started_at = _time()
2014-01-15 15:22:12 -08:00
self.heartbeater.start()
2014-03-07 15:36:22 -08:00
wsgi = simple_server.make_server(
self.listen_address[0],
self.listen_address[1],
self.api,
server_class=simple_server.WSGIServer)
2014-01-15 15:22:12 -08:00
try:
2014-03-07 15:36:22 -08:00
wsgi.serve_forever()
2014-01-15 15:22:12 -08:00
except BaseException as e:
2014-03-17 15:17:27 -07:00
self.log.error('shutting down')
self.log.exception(e)
2014-01-15 15:22:12 -08:00
self.heartbeater.stop()
def _load_mode_implementation(mode_name):
mgr = driver.DriverManager(
namespace='teeth_agent.modes',
name=mode_name.lower(),
2014-01-15 15:22:12 -08:00
invoke_on_load=True,
invoke_args=[],
)
return mgr.driver
2014-03-14 16:19:20 -07:00
def build_agent(api_url, listen_host, listen_port, ipaddr):
return TeethAgent(api_url, (listen_host, listen_port), ipaddr)