# Copyright 2013 Rackspace, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import collections import ipaddress import os import random import select import socket import threading import time from urllib import parse as urlparse import eventlet from ironic_lib import exception as lib_exc from ironic_lib import mdns from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import log import pkg_resources from ironic_python_agent.api import app from ironic_python_agent import config from ironic_python_agent import encoding from ironic_python_agent import errors from ironic_python_agent.extensions import base from ironic_python_agent import hardware from ironic_python_agent import inspector from ironic_python_agent import ironic_api_client from ironic_python_agent import utils LOG = log.getLogger(__name__) # Time(in seconds) to wait for any of the interfaces to be up # before lookup of the node is attempted NETWORK_WAIT_TIMEOUT = 60 # Time(in seconds) to wait before reattempt NETWORK_WAIT_RETRY = 5 cfg.CONF.import_group('metrics', 'ironic_lib.metrics_utils') cfg.CONF.import_group('metrics_statsd', 'ironic_lib.metrics_statsd') Host = collections.namedtuple('Host', ['hostname', 'port']) def _time(): """Wraps time.time() for simpler testing.""" return time.time() class IronicPythonAgentStatus(encoding.Serializable): """Represents the status of an agent.""" serializable_fields = ('started_at', 'version') def __init__(self, started_at, version): self.started_at = started_at self.version = version class IronicPythonAgentHeartbeater(threading.Thread): """Thread that periodically heartbeats to Ironic.""" # If we could wait at most N seconds between heartbeats (or in case of an # error) we will instead wait r x N seconds, where r is a random value # between these multipliers. min_jitter_multiplier = 0.3 max_jitter_multiplier = 0.6 # Exponential backoff values used in case of an error. In reality we will # only wait a portion of either of these delays based on the jitter # multipliers. initial_delay = 1.0 max_delay = 300.0 backoff_factor = 2.7 def __init__(self, agent): """Initialize the heartbeat thread. :param agent: an :class:`ironic_python_agent.agent.IronicPythonAgent` instance. """ super(IronicPythonAgentHeartbeater, self).__init__() self.agent = agent self.api = agent.api_client self.error_delay = self.initial_delay self.reader = None self.writer = None def run(self): """Start the heartbeat thread.""" # The first heartbeat happens immediately LOG.info('starting heartbeater') interval = 0 self.agent.set_agent_advertise_addr() self.reader, self.writer = os.pipe() p = select.poll() p.register(self.reader, select.POLLIN) try: while True: if p.poll(interval * 1000): if os.read(self.reader, 1).decode() == 'a': break self.do_heartbeat() interval_multiplier = random.uniform( self.min_jitter_multiplier, self.max_jitter_multiplier) interval = self.agent.heartbeat_timeout * interval_multiplier log_msg = 'sleeping before next heartbeat, interval: {}' LOG.info(log_msg.format(interval)) finally: os.close(self.reader) os.close(self.writer) self.reader = None self.writer = None def do_heartbeat(self): """Send a heartbeat to Ironic.""" try: self.api.heartbeat( uuid=self.agent.get_node_uuid(), advertise_address=self.agent.advertise_address ) self.error_delay = self.initial_delay LOG.info('heartbeat successful') except errors.HeartbeatConflictError: LOG.warning('conflict error sending heartbeat to {}'.format( self.agent.api_url)) self.error_delay = min(self.error_delay * self.backoff_factor, self.max_delay) except Exception: LOG.exception('error sending heartbeat to {}'.format( self.agent.api_url)) self.error_delay = min(self.error_delay * self.backoff_factor, self.max_delay) def force_heartbeat(self): os.write(self.writer, b'b') def stop(self): """Stop the heartbeat thread.""" if self.writer is not None: LOG.info('stopping heartbeater') os.write(self.writer, b'a') return self.join() class IronicPythonAgent(base.ExecuteCommandMixin): """Class for base agent functionality.""" def __init__(self, api_url, advertise_address, listen_address, ip_lookup_attempts, ip_lookup_sleep, network_interface, lookup_timeout, lookup_interval, standalone, agent_token, hardware_initialization_delay=0): super(IronicPythonAgent, self).__init__() if bool(cfg.CONF.keyfile) != bool(cfg.CONF.certfile): LOG.warning("Only one of 'keyfile' and 'certfile' options is " "defined in config file. Its value will be ignored.") self.ext_mgr = base.init_ext_manager(self) self.api_url = api_url if (not self.api_url or self.api_url == 'mdns') and not standalone: try: self.api_url, params = mdns.get_endpoint('baremetal') except lib_exc.ServiceLookupFailure: if self.api_url: # mDNS explicitly requested, report failure. raise else: # implicit fallback to mDNS, do not fail (maybe we're only # running inspection). LOG.warning('Could not get baremetal endpoint from mDNS, ' 'will not heartbeat') else: config.override(params) if self.api_url: self.api_client = ironic_api_client.APIClient(self.api_url) self.heartbeater = IronicPythonAgentHeartbeater(self) self.listen_address = listen_address self.advertise_address = advertise_address self.version = pkg_resources.get_distribution('ironic-python-agent')\ .version self.api = app.Application(self, cfg.CONF) self.heartbeat_timeout = None self.started_at = None self.node = None # lookup timeout in seconds self.lookup_timeout = lookup_timeout self.lookup_interval = lookup_interval self.ip_lookup_attempts = ip_lookup_attempts self.ip_lookup_sleep = ip_lookup_sleep self.network_interface = network_interface self.standalone = standalone self.hardware_initialization_delay = hardware_initialization_delay # IPA will stop serving requests and exit after this is set to False self.serve_api = True self.agent_token = agent_token # Allows this to be turned on by the conductor while running, # in the event of long running ramdisks where the conductor # got upgraded somewhere along the way. self.agent_token_required = cfg.CONF.agent_token_required self.iscsi_started = False def get_status(self): """Retrieve a serializable status. :returns: a :class:`ironic_python_agent.agent.IronicPythonAgent` instance describing the agent's status. """ return IronicPythonAgentStatus( started_at=self.started_at, version=self.version ) def validate_agent_token(self, token): # We did not get a token, i.e. None and # we've previously seen a token, which is # a mid-cluster upgrade case with long-running ramdisks. if (not token and self.agent_token and not self.agent_token_required): # TODO(TheJulia): Rip this out during or after the V cycle. LOG.warning('Agent token for requests are not required ' 'by the conductor, yet we received a token. ' 'Cluster may be mid-upgrade. Support to ' 'not fail in this condition will be removed in ' 'the Victoria development cycle.') # Tell the API everything is okay. return True return self.agent_token == token def _get_route_source(self, dest): """Get the IP address to send packages to destination.""" try: out, _err = utils.execute('ip', 'route', 'get', dest) except (EnvironmentError, processutils.ProcessExecutionError) as e: LOG.warning('Cannot get route to host %(dest)s: %(err)s', {'dest': dest, 'err': e}) return try: source = out.strip().split('\n')[0].split('src')[1].split()[0] except IndexError: LOG.warning('No route to host %(dest)s, route record: %(rec)s', {'dest': dest, 'rec': out}) return try: if ipaddress.ip_address(source).is_link_local: LOG.info('Ignoring link-local source to %(dest)s: %(rec)s', {'dest': dest, 'rec': out}) return except ValueError as exc: LOG.warning('Invalid IP address %(addr)s returned as a route ' 'to host %(dest)s: %(err)s', {'dest': dest, 'addr': source, 'err': exc}) return source def set_agent_advertise_addr(self): """Set advertised IP address for the agent, if not already set. If agent's advertised IP address is still default (None), try to find a better one. If the agent's network interface is None, replace that as well. :raises: LookupAgentIPError if an IP address could not be found """ if self.advertise_address.hostname is not None: return found_ip = None if self.network_interface is not None: # TODO(dtantsur): deprecate this found_ip = hardware.dispatch_to_managers('get_ipv4_addr', self.network_interface) else: url = urlparse.urlparse(self.api_url) ironic_host = url.hostname # Try resolving it in case it's not an IP address try: ironic_host = socket.gethostbyname(ironic_host) except socket.gaierror: LOG.debug('Count not resolve %s, maybe no DNS', ironic_host) for attempt in range(self.ip_lookup_attempts): found_ip = self._get_route_source(ironic_host) if found_ip: break time.sleep(self.ip_lookup_sleep) if found_ip: self.advertise_address = Host(hostname=found_ip, port=self.advertise_address.port) else: raise errors.LookupAgentIPError('Agent could not find a valid IP ' 'address.') def get_node_uuid(self): """Get UUID for Ironic node. If the agent has not yet heartbeated to Ironic, it will not have the UUID and this will raise an exception. :returns: A string containing the UUID for the Ironic node. :raises: UnknownNodeError if UUID is unknown. """ if self.node is None or 'uuid' not in self.node: raise errors.UnknownNodeError() return self.node['uuid'] def list_command_results(self): """Get a list of command results. :returns: list of :class:`ironic_python_agent.extensions.base. BaseCommandResult` objects. """ return list(self.command_results.values()) def get_command_result(self, result_id): """Get a specific command result by ID. :returns: a :class:`ironic_python_agent.extensions.base. BaseCommandResult` object. :raises: RequestedObjectNotFoundError if command with the given ID is not found. """ try: return self.command_results[result_id] except KeyError: raise errors.RequestedObjectNotFoundError('Command Result', result_id) def force_heartbeat(self): if not self.standalone: self.heartbeater.force_heartbeat() def _wait_for_interface(self): """Wait until at least one interface is up.""" wait_till = time.time() + NETWORK_WAIT_TIMEOUT while time.time() < wait_till: interfaces = hardware.dispatch_to_managers( 'list_network_interfaces') if not any(ifc.mac_address for ifc in interfaces): LOG.debug('Network is not up yet. ' 'No valid interfaces found, retrying ...') time.sleep(NETWORK_WAIT_RETRY) else: break else: LOG.warning("No valid network interfaces found. " "Node lookup will probably fail.") def serve_ipa_api(self): """Serve the API until an extension terminates it.""" self.api.start() if not self.standalone and self.api_url: # Don't start heartbeating until the server is listening self.heartbeater.start() try: while self.serve_api: eventlet.sleep(0.1) except KeyboardInterrupt: LOG.info('Caught keyboard interrupt, exiting') self.api.stop() def process_lookup_data(self, content): """Update agent configuration from lookup data.""" self.node = content['node'] LOG.info('Lookup succeeded, node UUID is %s', self.node['uuid']) hardware.cache_node(self.node) self.heartbeat_timeout = content['config']['heartbeat_timeout'] # Update config with values from Ironic config = content.get('config', {}) if config.get('metrics'): for opt, val in config.items(): setattr(cfg.CONF.metrics, opt, val) if config.get('metrics_statsd'): for opt, val in config.items(): setattr(cfg.CONF.metrics_statsd, opt, val) if config.get('agent_token_required'): self.agent_token_required = True token = config.get('agent_token') if token: if len(token) >= 32: LOG.debug('Agent token recorded as designated by ' 'the ironic installation.') self.agent_token = token # set with-in the API client. if not self.standalone: self.api_client.agent_token = token elif token == '******': LOG.warning('The agent token has already been ' 'retrieved. IPA may not operate as ' 'intended and the deployment may fail ' 'depending on settings in the ironic ' 'deployment.') if not self.agent_token and self.agent_token_required: LOG.error('Ironic is signaling that agent tokens ' 'are required, however we do not have ' 'a token on file. ' 'This is likely **FATAL**.') else: LOG.info('An invalid token was received.') if self.agent_token and not self.standalone: # Explicitly set the token in our API client before # starting heartbeat operations. self.api_client.agent_token = self.agent_token def run(self): """Run the Ironic Python Agent.""" LOG.info('Starting ironic-python-agent version: %s', self.version) # Get the UUID so we can heartbeat to Ironic. Raises LookupNodeError # if there is an issue (uncaught, restart agent) self.started_at = _time() # Attempt to sync the software clock utils.sync_clock(ignore_errors=True) # Cached hw managers at runtime, not load time. See bug 1490008. hardware.get_managers() # Operator-settable delay before hardware actually comes up. # Helps with slow RAID drivers - see bug 1582797. if self.hardware_initialization_delay > 0: LOG.info('Waiting %d seconds before proceeding', self.hardware_initialization_delay) time.sleep(self.hardware_initialization_delay) if not self.standalone: # Inspection should be started before call to lookup, otherwise # lookup will fail due to unknown MAC. uuid = None # We can't try to inspect or heartbeat until we have valid # interfaces to perform those actions over. self._wait_for_interface() if cfg.CONF.inspection_callback_url: try: # Attempt inspection. This may fail, and previously # an error would be logged. uuid = inspector.inspect() except errors.InspectionError as e: LOG.error('Failed to perform inspection: %s', e) if self.api_url: content = self.api_client.lookup_node( hardware_info=hardware.list_hardware_info(use_cache=True), timeout=self.lookup_timeout, starting_interval=self.lookup_interval, node_uuid=uuid) LOG.debug('Received lookup results: %s', content) self.process_lookup_data(content) elif cfg.CONF.inspection_callback_url: LOG.info('No ipa-api-url configured, Heartbeat and lookup ' 'skipped for inspector.') else: # NOTE(TheJulia): Once communication flow capability is # able to be driven solely from the conductor, this is no # longer a major issue. LOG.error('Neither ipa-api-url nor inspection_callback_url' 'found, please check your pxe append parameters.') self.serve_ipa_api() if not self.standalone and self.api_url: self.heartbeater.stop()