# Copyright (C) 2015 Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

# Following code fixes 2 issues with kafka-python and
# The current release of eventlet (0.19.0) does not actually remove
# select.poll [1]. Because of kafka-python.selectors34 selects
# PollSelector instead of SelectSelector [2]. PollSelector relies on
# select.poll, which does not work when eventlet/greenlet is used. This
# bug in evenlet is fixed in the master branch [3], but there's no
# release of eventlet that includes this fix at this point.

import json
import threading

import kafka
from kafka.client_async import selectors
import kafka.errors
from oslo_log import log as logging
from oslo_utils import eventletutils
import tenacity

from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import kafka_options
from oslo_messaging._drivers import pool as driver_pool
from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LW
from oslo_serialization import jsonutils

import logging as l
l.basicConfig(level=l.INFO)
l.getLogger("kafka").setLevel(l.WARN)
l.getLogger("stevedore").setLevel(l.WARN)

if eventletutils.is_monkey_patched('select'):
    # monkeypatch the vendored SelectSelector._select like eventlet does
    # https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
    from eventlet.green import select
    selectors.SelectSelector._select = staticmethod(select.select)

    # Force to use the select selectors
    KAFKA_SELECTOR = selectors.SelectSelector
else:
    KAFKA_SELECTOR = selectors.DefaultSelector

LOG = logging.getLogger(__name__)


def unpack_message(msg):
    context = {}
    message = None
    msg = json.loads(msg)
    message = driver_common.deserialize_msg(msg)
    context = message['_context']
    del message['_context']
    return context, message


def pack_message(ctxt, msg):
    """Pack context into msg."""

    if isinstance(ctxt, dict):
        context_d = ctxt
    else:
        context_d = ctxt.to_dict()
    msg['_context'] = context_d

    msg = driver_common.serialize_msg(msg)

    return msg


def target_to_topic(target, priority=None):
    """Convert target into topic string

    :param target: Message destination target
    :type target: oslo_messaging.Target
    :param priority: Notification priority
    :type priority: string
    """
    if not priority:
        return target.topic
    return target.topic + '.' + priority


def retry_on_retriable_kafka_error(exc):
    return (isinstance(exc, kafka.errors.KafkaError) and exc.retriable)


def with_reconnect(retries=None):
    def decorator(func):
        @tenacity.retry(
            retry=tenacity.retry_if_exception(retry_on_retriable_kafka_error),
            wait=tenacity.wait_fixed(1),
            stop=tenacity.stop_after_attempt(retries),
            reraise=True
        )
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        return wrapper
    return decorator


class Connection(object):

    def __init__(self, conf, url, purpose):

        self.client = None
        driver_conf = conf.oslo_messaging_kafka
        self.batch_size = driver_conf.producer_batch_size
        self.linger_ms = driver_conf.producer_batch_timeout * 1000
        self.conf = conf
        self.producer = None
        self.producer_lock = threading.Lock()
        self.consumer = None
        self.consumer_timeout = float(driver_conf.kafka_consumer_timeout)
        self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
        self.group_id = driver_conf.consumer_group
        self.url = url
        self._parse_url()
        # TODO(Support for manual/auto_commit functionality)
        # When auto_commit is False, consumer can manually notify
        # the completion of the subscription.
        # Currently we don't support for non auto commit option
        self.auto_commit = True
        self._consume_loop_stopped = False

    def _parse_url(self):
        driver_conf = self.conf.oslo_messaging_kafka
        self.hostaddrs = []

        for host in self.url.hosts:
            if host.hostname:
                self.hostaddrs.append("%s:%s" % (
                    host.hostname,
                    host.port or driver_conf.kafka_default_port))

        if not self.hostaddrs:
            self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
                                             driver_conf.kafka_default_port))

    def notify_send(self, topic, ctxt, msg, retry):
        """Send messages to Kafka broker.

        :param topic: String of the topic
        :param ctxt: context for the messages
        :param msg: messages for publishing
        :param retry: the number of retry
        """
        retry = retry if retry >= 0 else None
        message = pack_message(ctxt, msg)
        message = jsonutils.dumps(message)

        @with_reconnect(retries=retry)
        def wrapped_with_reconnect():
            self._ensure_producer()
            # NOTE(sileht): This returns a future, we can use get()
            # if we want to block like other driver
            self.producer.send(topic, message)

        try:
            wrapped_with_reconnect()
        except Exception:
            # NOTE(sileht): if something goes wrong close the producer
            # connection
            self._close_producer()
            raise

    @with_reconnect()
    def _poll_messages(self, timeout):
        messages = self.consumer.poll(timeout * 1000.0)
        messages = [record.value
                    for records in messages.values() if records
                    for record in records]
        if not messages:
            # NOTE(sileht): really ? you return payload but no messages...
            # simulate timeout to consume message again
            raise kafka.errors.ConsumerTimeout()
        return messages

    def consume(self, timeout=None):
        """Receive up to 'max_fetch_messages' messages.

        :param timeout: poll timeout in seconds
        """
        if self._consume_loop_stopped:
            return None

        timeout = timeout if timeout >= 0 else self.consumer_timeout
        try:
            messages = self._poll_messages(timeout)
        except kafka.errors.ConsumerTimeout as e:
            raise driver_common.Timeout(e.message)
        except Exception:
            LOG.exception(_LE("Failed to consume messages"))
            messages = None
        return messages

    def stop_consuming(self):
        self._consume_loop_stopped = True

    def reset(self):
        """Reset a connection so it can be used again."""
        pass

    def close(self):
        self._close_producer()
        if self.consumer:
            self.consumer.close()
            self.consumer = None

    def commit(self):
        """Commit is used by subscribers belonging to the same group.
        After subscribing messages, commit is called to prevent
        the other subscribers which belong to the same group
        from re-subscribing the same messages.

        Currently self.auto_commit option is always True,
        so we don't need to call this function.
        """
        self.consumer.commit()

    def _close_producer(self):
        with self.producer_lock:
            if self.producer:
                self.producer.close()
                self.producer = None

    def _ensure_producer(self):
        if self.producer:
            return
        with self.producer_lock:
            if self.producer:
                return
            self.producer = kafka.KafkaProducer(
                bootstrap_servers=self.hostaddrs,
                linger_ms=self.linger_ms,
                batch_size=self.batch_size,
                selector=KAFKA_SELECTOR)

    @with_reconnect()
    def declare_topic_consumer(self, topics, group=None):
        self.consumer = kafka.KafkaConsumer(
            *topics, group_id=(group or self.group_id),
            bootstrap_servers=self.hostaddrs,
            max_partition_fetch_bytes=self.max_fetch_bytes,
            selector=KAFKA_SELECTOR
        )


class OsloKafkaMessage(base.RpcIncomingMessage):

    def __init__(self, ctxt, message):
        super(OsloKafkaMessage, self).__init__(ctxt, message)

    def requeue(self):
        LOG.warning(_LW("requeue is not supported"))

    def reply(self, reply=None, failure=None):
        LOG.warning(_LW("reply is not supported"))


class KafkaListener(base.PollStyleListener):

    def __init__(self, conn):
        super(KafkaListener, self).__init__()
        self._stopped = threading.Event()
        self.conn = conn
        self.incoming_queue = []

    @base.batch_poll_helper
    def poll(self, timeout=None):
        while not self._stopped.is_set():
            if self.incoming_queue:
                return self.incoming_queue.pop(0)
            try:
                messages = self.conn.consume(timeout=timeout) or []
                for message in messages:
                    msg = OsloKafkaMessage(*unpack_message(message))
                    self.incoming_queue.append(msg)
            except driver_common.Timeout:
                return None

    def stop(self):
        self._stopped.set()
        self.conn.stop_consuming()

    def cleanup(self):
        self.conn.close()

    def commit(self):
        # TODO(Support for manually/auto commit functionality)
        # It's better to allow users to commit manually and support for
        # self.auto_commit = False option. For now, this commit function
        # is meaningless since user couldn't call this function and
        # auto_commit option is always True.
        self.conn.commit()


class KafkaDriver(base.BaseDriver):
    """Note: Current implementation of this driver is experimental.
    We will have functional and/or integrated testing enabled for this driver.
    """

    def __init__(self, conf, url, default_exchange=None,
                 allowed_remote_exmods=None):
        super(KafkaDriver, self).__init__(
            conf, url, default_exchange, allowed_remote_exmods)

        kafka_options.register_opts(conf)
        # the pool configuration properties
        max_size = self.conf.oslo_messaging_kafka.pool_size
        min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
        ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl

        self.connection_pool = driver_pool.ConnectionPool(
            self.conf, max_size, min_size, ttl,
            self._url, Connection)
        self.listeners = []

    def cleanup(self):
        for c in self.listeners:
            c.close()
        self.listeners = []

    def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
             retry=None):
        raise NotImplementedError(
            'The RPC implementation for Kafka is not implemented')

    def send_notification(self, target, ctxt, message, version, retry=None):
        """Send notification to Kafka brokers

        :param target: Message destination target
        :type target: oslo_messaging.Target
        :param ctxt: Message context
        :type ctxt: dict
        :param message: Message payload to pass
        :type message: dict
        :param version: Messaging API version (currently not used)
        :type version: str
        :param retry: an optional default kafka consumer retries configuration
                      None means to retry forever
                      0 means no retry
                      N means N retries
        :type retry: int
        """
        with self._get_connection(purpose=driver_common.PURPOSE_SEND) as conn:
            conn.notify_send(target_to_topic(target), ctxt, message, retry)

    def listen(self, target, batch_size, batch_timeout):
        raise NotImplementedError(
            'The RPC implementation for Kafka is not implemented')

    def listen_for_notifications(self, targets_and_priorities, pool,
                                 batch_size, batch_timeout):
        """Listen to a specified list of targets on Kafka brokers

        :param targets_and_priorities: List of pairs (target, priority)
                                       priority is not used for kafka driver
                                       target.exchange_target.topic is used as
                                       a kafka topic
        :type targets_and_priorities: list
        :param pool: consumer group of Kafka consumers
        :type pool: string
        """
        conn = self._get_connection(purpose=driver_common.PURPOSE_LISTEN)
        topics = set()
        for target, priority in targets_and_priorities:
            topics.add(target_to_topic(target, priority))

        conn.declare_topic_consumer(topics, pool)

        listener = KafkaListener(conn)
        return base.PollStyleListenerAdapter(listener, batch_size,
                                             batch_timeout)

    def _get_connection(self, purpose):
        return driver_common.ConnectionContext(self.connection_pool, purpose)