# Copyright 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import ipaddress
import random
import socket
import threading
import time
from urllib import parse as urlparse

import eventlet
from ironic_lib import exception as lib_exc
from ironic_lib import mdns
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log
import pkg_resources

from ironic_python_agent.api import app
from ironic_python_agent import config
from ironic_python_agent import encoding
from ironic_python_agent import errors
from ironic_python_agent.extensions import base
from ironic_python_agent import hardware
from ironic_python_agent import inspector
from ironic_python_agent import ironic_api_client
from ironic_python_agent import utils

LOG = log.getLogger(__name__)

# Time(in seconds) to wait for any of the interfaces to be up
# before lookup of the node is attempted
NETWORK_WAIT_TIMEOUT = 60

# Time(in seconds) to wait before reattempt
NETWORK_WAIT_RETRY = 5

cfg.CONF.import_group('metrics', 'ironic_lib.metrics_utils')
cfg.CONF.import_group('metrics_statsd', 'ironic_lib.metrics_statsd')

Host = collections.namedtuple('Host', ['hostname', 'port'])


def _time():
    """Wraps time.time() for simpler testing."""
    return time.time()


class IronicPythonAgentStatus(encoding.Serializable):
    """Represents the status of an agent."""

    serializable_fields = ('started_at', 'version')

    def __init__(self, started_at, version):
        self.started_at = started_at
        self.version = version


class IronicPythonAgentHeartbeater(threading.Thread):
    """Thread that periodically heartbeats to Ironic."""

    # If we could wait at most N seconds between heartbeats (or in case of an
    # error) we will instead wait r x N seconds, where r is a random value
    # between these multipliers.
    min_jitter_multiplier = 0.3
    max_jitter_multiplier = 0.6

    def __init__(self, agent):
        """Initialize the heartbeat thread.

        :param agent: an :class:`ironic_python_agent.agent.IronicPythonAgent`
                      instance.
        """
        super(IronicPythonAgentHeartbeater, self).__init__()
        self.agent = agent
        self.stop_event = threading.Event()
        self.api = agent.api_client
        self.interval = 0
        self.heartbeat_forced = False
        self.previous_heartbeat = 0

    def run(self):
        """Start the heartbeat thread."""
        # The first heartbeat happens immediately
        LOG.info('Starting heartbeater')
        self.agent.set_agent_advertise_addr()

        while not self.stop_event.wait(min(self.interval, 5)):
            if self._heartbeat_expected():
                self.do_heartbeat()
            eventlet.sleep(0)

    def _heartbeat_expected(self):
        # Normal heartbeating
        if _time() > self.previous_heartbeat + self.interval:
            return True

        # Forced heartbeating, but once in 5 seconds
        if (self.heartbeat_forced
                and _time() > self.previous_heartbeat + 5):
            return True

    def do_heartbeat(self):
        """Send a heartbeat to Ironic."""
        try:
            self.api.heartbeat(
                uuid=self.agent.get_node_uuid(),
                advertise_address=self.agent.advertise_address,
                advertise_protocol=self.agent.advertise_protocol,
                generated_cert=self.agent.generated_cert,
            )
            LOG.info('heartbeat successful')
            self.heartbeat_forced = False
            self.previous_heartbeat = _time()
        except errors.HeartbeatConflictError:
            LOG.warning('conflict error sending heartbeat to %s',
                        self.agent.api_url)
        except Exception:
            LOG.exception('error sending heartbeat to %s', self.agent.api_url)
        finally:
            interval_multiplier = random.uniform(self.min_jitter_multiplier,
                                                 self.max_jitter_multiplier)
            self.interval = self.agent.heartbeat_timeout * interval_multiplier
            LOG.info('sleeping before next heartbeat, interval: %s',
                     self.interval)

    def force_heartbeat(self):
        self.heartbeat_forced = True

    def stop(self):
        """Stop the heartbeat thread."""
        LOG.info('stopping heartbeater')
        self.stop_event.set()
        return self.join()


class IronicPythonAgent(base.ExecuteCommandMixin):
    """Class for base agent functionality."""

    def __init__(self, api_url, advertise_address, listen_address,
                 ip_lookup_attempts, ip_lookup_sleep, network_interface,
                 lookup_timeout, lookup_interval, standalone, agent_token,
                 hardware_initialization_delay=0, advertise_protocol='http'):
        super(IronicPythonAgent, self).__init__()
        if bool(cfg.CONF.keyfile) != bool(cfg.CONF.certfile):
            LOG.warning("Only one of 'keyfile' and 'certfile' options is "
                        "defined in config file. Its value will be ignored.")
        self.ext_mgr = base.init_ext_manager(self)
        self.api_url = api_url
        if (not self.api_url or self.api_url == 'mdns') and not standalone:
            try:
                self.api_url, params = mdns.get_endpoint('baremetal')
            except lib_exc.ServiceLookupFailure:
                if self.api_url:
                    # mDNS explicitly requested, report failure.
                    raise
                else:
                    # implicit fallback to mDNS, do not fail (maybe we're only
                    # running inspection).
                    LOG.warning('Could not get baremetal endpoint from mDNS, '
                                'will not heartbeat')
            else:
                config.override(params)

        if self.api_url:
            self.api_client = ironic_api_client.APIClient(self.api_url)
            self.heartbeater = IronicPythonAgentHeartbeater(self)
        self.listen_address = listen_address
        self.advertise_address = advertise_address
        self.advertise_protocol = advertise_protocol
        self.version = pkg_resources.get_distribution('ironic-python-agent')\
            .version
        self.api = app.Application(self, cfg.CONF)
        self.heartbeat_timeout = None
        self.started_at = None
        self.node = None
        # lookup timeout in seconds
        self.lookup_timeout = lookup_timeout
        self.lookup_interval = lookup_interval
        self.ip_lookup_attempts = ip_lookup_attempts
        self.ip_lookup_sleep = ip_lookup_sleep
        self.network_interface = network_interface
        self.standalone = standalone
        self.hardware_initialization_delay = hardware_initialization_delay
        # IPA will stop serving requests and exit after this is set to False
        self.serve_api = True
        self.agent_token = agent_token
        # Allows this to be turned on by the conductor while running,
        # in the event of long running ramdisks where the conductor
        # got upgraded somewhere along the way.
        self.agent_token_required = cfg.CONF.agent_token_required
        self.generated_cert = None

    def get_status(self):
        """Retrieve a serializable status.

        :returns: a :class:`ironic_python_agent.agent.IronicPythonAgent`
                  instance describing the agent's status.
        """
        return IronicPythonAgentStatus(
            started_at=self.started_at,
            version=self.version
        )

    def validate_agent_token(self, token):
        # We did not get a token, i.e. None and
        # we've previously seen a token, which is
        # a mid-cluster upgrade case with long-running ramdisks.
        if (not token and self.agent_token
                and not self.agent_token_required):
            # TODO(TheJulia): Rip this out during or after the V cycle.
            LOG.warning('Agent token for requests are not required '
                        'by the conductor, yet we received a token. '
                        'Cluster may be mid-upgrade. Support to '
                        'not fail in this condition will be removed in '
                        'the Victoria development cycle.')
            # Tell the API everything is okay.
            return True

        return self.agent_token == token

    def _get_route_source(self, dest):
        """Get the IP address to send packages to destination."""
        try:
            out, _err = utils.execute('ip', 'route', 'get', dest)
        except (EnvironmentError, processutils.ProcessExecutionError) as e:
            LOG.warning('Cannot get route to host %(dest)s: %(err)s',
                        {'dest': dest, 'err': e})
            return

        try:
            source = out.strip().split('\n')[0].split('src')[1].split()[0]
        except IndexError:
            LOG.warning('No route to host %(dest)s, route record: %(rec)s',
                        {'dest': dest, 'rec': out})
            return

        try:
            if ipaddress.ip_address(source).is_link_local:
                LOG.info('Ignoring link-local source to %(dest)s: %(rec)s',
                         {'dest': dest, 'rec': out})
                return
        except ValueError as exc:
            LOG.warning('Invalid IP address %(addr)s returned as a route '
                        'to host %(dest)s: %(err)s',
                        {'dest': dest, 'addr': source, 'err': exc})

        return source

    def set_agent_advertise_addr(self):
        """Set advertised IP address for the agent, if not already set.

        If agent's advertised IP address is still default (None), try to
        find a better one.  If the agent's network interface is None, replace
        that as well.

        :raises: LookupAgentIPError if an IP address could not be found
        """
        if self.advertise_address.hostname is not None:
            return

        found_ip = None
        if self.network_interface is not None:
            # TODO(dtantsur): deprecate this
            found_ip = hardware.dispatch_to_managers('get_ipv4_addr',
                                                     self.network_interface)
        else:
            url = urlparse.urlparse(self.api_url)
            ironic_host = url.hostname
            # Try resolving it in case it's not an IP address
            try:
                ironic_host = socket.gethostbyname(ironic_host)
            except socket.gaierror:
                LOG.debug('Count not resolve %s, maybe no DNS', ironic_host)

            for attempt in range(self.ip_lookup_attempts):
                found_ip = self._get_route_source(ironic_host)
                if found_ip:
                    break

                time.sleep(self.ip_lookup_sleep)

        if found_ip:
            self.advertise_address = Host(hostname=found_ip,
                                          port=self.advertise_address.port)
        else:
            raise errors.LookupAgentIPError('Agent could not find a valid IP '
                                            'address.')

    def get_node_uuid(self):
        """Get UUID for Ironic node.

        If the agent has not yet heartbeated to Ironic, it will not have
        the UUID and this will raise an exception.

        :returns: A string containing the UUID for the Ironic node.
        :raises: UnknownNodeError if UUID is unknown.
        """
        if self.node is None or 'uuid' not in self.node:
            raise errors.UnknownNodeError()
        return self.node['uuid']

    def list_command_results(self):
        """Get a list of command results.

        :returns: list of :class:`ironic_python_agent.extensions.base.
                  BaseCommandResult` objects.
        """
        return list(self.command_results.values())

    def get_command_result(self, result_id):
        """Get a specific command result by ID.

        :returns: a :class:`ironic_python_agent.extensions.base.
                  BaseCommandResult` object.
        :raises: RequestedObjectNotFoundError if command with the given ID
                 is not found.
        """
        try:
            return self.command_results[result_id]
        except KeyError:
            raise errors.RequestedObjectNotFoundError('Command Result',
                                                      result_id)

    def force_heartbeat(self):
        if not self.standalone:
            self.heartbeater.force_heartbeat()

    def _wait_for_interface(self):
        """Wait until at least one interface is up."""

        wait_till = time.time() + NETWORK_WAIT_TIMEOUT
        while time.time() < wait_till:
            interfaces = hardware.dispatch_to_managers(
                'list_network_interfaces')
            if not any(ifc.mac_address for ifc in interfaces):
                LOG.debug('Network is not up yet. '
                          'No valid interfaces found, retrying ...')
                time.sleep(NETWORK_WAIT_RETRY)
            else:
                break

        else:
            LOG.warning("No valid network interfaces found. "
                        "Node lookup will probably fail.")

    def _start_auto_tls(self):
        # NOTE(dtantsur): if listen_tls is True, assume static TLS
        # configuration and don't auto-generate anything.
        if cfg.CONF.listen_tls or not cfg.CONF.enable_auto_tls:
            LOG.debug('Automated TLS is disabled')
            return None, None

        if not self.api_url or not self.api_client.supports_auto_tls():
            LOG.warning('Ironic does not support automated TLS')
            return None, None

        self.set_agent_advertise_addr()

        LOG.info('Generating TLS parameters automatically for IP %s',
                 self.advertise_address.hostname)
        tls_info = hardware.dispatch_to_managers(
            'generate_tls_certificate', self.advertise_address.hostname)
        self.generated_cert = tls_info.text
        self.advertise_protocol = 'https'
        return tls_info.path, tls_info.private_key_path

    def serve_ipa_api(self):
        """Serve the API until an extension terminates it."""
        cert_file, key_file = self._start_auto_tls()
        self.api.start(cert_file, key_file)
        if not self.standalone and self.api_url:
            # Don't start heartbeating until the server is listening
            self.heartbeater.start()
        try:
            while self.serve_api:
                eventlet.sleep(0.1)
        except KeyboardInterrupt:
            LOG.info('Caught keyboard interrupt, exiting')
        self.api.stop()

    def process_lookup_data(self, content):
        """Update agent configuration from lookup data."""

        self.node = content['node']
        LOG.info('Lookup succeeded, node UUID is %s',
                 self.node['uuid'])
        hardware.cache_node(self.node)
        self.heartbeat_timeout = content['config']['heartbeat_timeout']

        # Update config with values from Ironic
        config = content.get('config', {})
        if config.get('metrics'):
            for opt, val in config.items():
                setattr(cfg.CONF.metrics, opt, val)
        if config.get('metrics_statsd'):
            for opt, val in config.items():
                setattr(cfg.CONF.metrics_statsd, opt, val)
        if config.get('agent_token_required'):
            self.agent_token_required = True
        token = config.get('agent_token')
        if token:
            if len(token) >= 32:
                LOG.debug('Agent token recorded as designated by '
                          'the ironic installation.')
                self.agent_token = token
                # set with-in the API client.
                if not self.standalone:
                    self.api_client.agent_token = token
            elif token == '******':
                LOG.warning('The agent token has already been '
                            'retrieved. IPA may not operate as '
                            'intended and the deployment may fail '
                            'depending on settings in the ironic '
                            'deployment.')
                if not self.agent_token and self.agent_token_required:
                    LOG.error('Ironic is signaling that agent tokens '
                              'are required, however we do not have '
                              'a token on file. '
                              'This is likely **FATAL**.')
            else:
                LOG.info('An invalid token was received.')
        if self.agent_token and not self.standalone:
            # Explicitly set the token in our API client before
            # starting heartbeat operations.
            self.api_client.agent_token = self.agent_token

    def run(self):
        """Run the Ironic Python Agent."""
        LOG.info('Starting ironic-python-agent version: %s',
                 self.version)
        # Get the UUID so we can heartbeat to Ironic. Raises LookupNodeError
        # if there is an issue (uncaught, restart agent)
        self.started_at = _time()
        # Attempt to sync the software clock
        utils.sync_clock(ignore_errors=True)

        # Cached hw managers at runtime, not load time. See bug 1490008.
        hardware.get_managers()
        # Operator-settable delay before hardware actually comes up.
        # Helps with slow RAID drivers - see bug 1582797.
        if self.hardware_initialization_delay > 0:
            LOG.info('Waiting %d seconds before proceeding',
                     self.hardware_initialization_delay)
            time.sleep(self.hardware_initialization_delay)

        if not self.standalone:
            # Inspection should be started before call to lookup, otherwise
            # lookup will fail due to unknown MAC.
            uuid = None
            # We can't try to inspect or heartbeat until we have valid
            # interfaces to perform those actions over.
            self._wait_for_interface()

            if cfg.CONF.inspection_callback_url:
                try:
                    # Attempt inspection. This may fail, and previously
                    # an error would be logged.
                    uuid = inspector.inspect()
                except errors.InspectionError as e:
                    LOG.error('Failed to perform inspection: %s', e)

            if self.api_url:
                content = self.api_client.lookup_node(
                    hardware_info=hardware.list_hardware_info(use_cache=True),
                    timeout=self.lookup_timeout,
                    starting_interval=self.lookup_interval,
                    node_uuid=uuid)
                LOG.debug('Received lookup results: %s', content)
                self.process_lookup_data(content)
                # Save the API url in case we need it later.
                hardware.save_api_client(
                    self.api_client, self.lookup_timeout,
                    self.lookup_interval)

            elif cfg.CONF.inspection_callback_url:
                LOG.info('No ipa-api-url configured, Heartbeat and lookup '
                         'skipped for inspector.')
            else:
                # NOTE(TheJulia): Once communication flow capability is
                # able to be driven solely from the conductor, this is no
                # longer a major issue.
                LOG.error('Neither ipa-api-url nor inspection_callback_url'
                          'found, please check your pxe append parameters.')

        self.serve_ipa_api()

        if not self.standalone and self.api_url:
            self.heartbeater.stop()